Pipeline Entities
GitHub Link to Code.
Pipeline entities module.
Contains data container classes for the pipeline orchestration system.
Pipeline Data
Central data container for the Pipeline orchestration system.
This module provides the PipelineData class that serves as the central data container orchestrating all analysis data including trajectories, features, clustering results, and decomposition results.
- class mdxplain.pipeline.entities.pipeline_data.PipelineData(use_memmap: bool = False, cache_dir: str = './cache', chunk_size: int = 2000, dtype: type = <class 'numpy.float32'>, max_memory_gb: float = 6.0)
Central data container orchestrating all analysis data.
This class serves as the central data hub for the pipeline system, containing all trajectory data, computed features, clustering results, decomposition results, and future analysis modules.
The PipelineData serves as the central “God-Object” that gets passed around to managers, following the builder pattern while providing separation of concerns.
Attributes
- use_memmapbool
Whether to use memory mapping for large datasets
- cache_dirstr
Directory for cache files when using memory mapping
- chunk_sizeint
Chunk size for memory-efficient processing
- dtypetype
Data type for feature matrices (float32 or float64)
- max_memory_gbfloat
Estimated maximum memory usage in GB for current data
- trajectory_dataTrajectoryData
Container for trajectory data and metadata
- feature_dataDict[str, Dict[int, FeatureData]]
Nested dictionary of computed features by type and trajectory index
- selected_feature_dataDict[str, FeatureSelectorData]
Dictionary of feature selection results by selection name
- decomposition_dataDict[str, DecompositionData]
Dictionary of decomposition results by selection name
- cluster_dataDict[str, ClusterData]
Dictionary of clustering results by cluster name
- data_selector_dataDict[str, DataSelectorData]
Dictionary of data selector results by selector name
- data_selector_groupsDict[str, DataSelectorGroup]
Dictionary of data selector groups by group name
- comparison_dataDict[str, ComparisonData]
Dictionary of comparison results by comparison name
- feature_importance_dataDict[str, FeatureImportanceData]
Dictionary of feature importance results by analysis name
- structure_visualization_dataDict[str, StructureVisualizationData]
Dictionary of structure visualization data by session name
- custom_metadataDict[str, Any]
User-defined custom metadata attached to the pipeline state
Examples
Pipeline mode (automatic):
>>> pipeline = PipelineManager() >>> # PipelineData is managed automatically >>> pipeline.trajectory.load_trajectories('../data')
Standalone mode (manual):
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.load_trajectories(pipeline_data, '../data')
- __init__(use_memmap: bool = False, cache_dir: str = './cache', chunk_size: int = 2000, dtype: type = <class 'numpy.float32'>, max_memory_gb: float = 6.0)
Initialize the central pipeline data container.
Creates empty containers for all analysis data types that will be populated through the respective manager interfaces.
Parameters
- use_memmapbool, default=False
Whether to use memory mapping for large datasets
- cache_dirstr, default=”./cache”
Directory for cache files when using memory mapping
- chunk_sizeint, default=2000
Chunk size for memory-efficient processing
- dtypetype, default=np.float32
Data type for feature matrices (float32 or float64). float32 saves 50% memory and is sufficient for most MD analysis. Use float64 only if extreme numerical precision required.
- max_memory_gbfloat, default=6.0
Maximum memory in GB for dataset processing. Used for memory-aware sampling in algorithms like DecisionTree. Datasets exceeding this limit will be automatically sampled.
Returns
- None
Initializes PipelineData instance with empty data containers
- clear_all_data() None
Clear all stored analysis data.
Resets all data containers to empty state, effectively clearing all computed results while preserving the container structure. Useful for starting fresh analysis or freeing memory.
Returns
- None
Clears all data containers in-place
Examples
>>> pipeline_data = PipelineData() >>> # ... after computations ... >>> pipeline_data.clear_all_data()
- update_max_memory_from_trajectories(max_atoms: int) None
Update memory estimate after trajectory loading.
Parameters
- max_atomsint
Maximum number of atoms across all trajectories
Returns
- None
Updates max_memory_gb based on atom count
- update_max_memory_from_features(n_features: int) None
Update memory estimate after feature computation.
Parameters
- n_featuresint
Actual number of features computed
Returns
- None
Updates max_memory_gb based on actual feature count
- get_data_summary() Dict[str, Any]
Get summary information about all stored data.
Provides an overview of all data containers with counts and availability information. Useful for debugging and monitoring the state of the pipeline.
Returns
- dict
Summary dictionary with data container information
Examples
>>> pipeline_data = PipelineData() >>> summary = pipeline_data.get_data_summary() >>> print(summary['trajectories_loaded']) >>> print(summary['features_computed'])
- add_custom_metadata(name: str, value: Any, overwrite: bool = False) None
Register custom metadata payload in the pipeline state.
Parameters
- namestr
Metadata key.
- valueAny
Metadata payload to store.
- overwritebool, default=False
If False, existing keys raise ValueError.
Returns
- None
Stores metadata in-place.
- get_custom_metadata(name: str) Any
Get a previously registered custom metadata payload.
Parameters
- namestr
Metadata key.
Returns
- Any
Stored payload.
Raises
- ValueError
If the key does not exist.
- has_trajectories() bool
Check if trajectory data is available.
Returns
- bool
True if trajectories are loaded, False otherwise
- has_features() bool
Check if any feature data is available.
Returns
- bool
True if features are computed, False otherwise
- has_clusterings() bool
Check if any clustering results are available.
Returns
- bool
True if clustering results exist, False otherwise
- has_decompositions() bool
Check if any decomposition results are available.
Returns
- bool
True if decomposition results exist, False otherwise
- get_feature(feature_type: str | Any) Any
Retrieve a computed feature by its type.
This method returns the FeatureData instance for a previously computed feature. The returned object provides access to the computed data, feature names, analysis methods, and data reduction capabilities.
Supports three input variants:
feature_type.Distances() (instance)
feature_type.Distances (class with metaclass)
“distances” (string)
Parameters
- feature_typeFeatureTypeBase, FeatureTypeBase class, or str
Feature type instance, class, or string (e.g., Distances(), Distances, “distances”)
Returns
- FeatureData
The FeatureData instance containing computed data and analysis methods
Raises
- ValueError
If the requested feature type has not been computed yet
Examples
>>> # Get distances feature - all variants work: >>> distances = pipeline_data.get_feature("distances") >>> distance_data = distances.get_data() >>> feature_names = distances.get_feature_names()
>>> # Get contacts and apply analysis >>> contacts = pipeline_data.get_feature("contacts") >>> frequency = contacts.analysis.compute_frequency()
- get_decomposition(decomposition_name: str)
Retrieve a computed decomposition by selection name.
This method returns the DecompositionData instance for a previously computed decomposition. The returned object provides access to the decomposed data, metadata, hyperparameters, and transformation details.
Parameters
- decomposition_namestr
Name of the decomposition
Returns
- DecompositionData
The DecompositionData instance containing decomposed data and metadata
Raises
- ValueError
If the requested decomposition has not been computed yet
Examples
>>> # Get decomposition for a selection >>> decomp_data = pipeline_data.get_decomposition("feature_sel") >>> transformed = decomp_data.get_data() >>> metadata = decomp_data.get_metadata()
>>> # Get decomposition type from metadata >>> decomp_type = decomp_data.metadata.get('decomposition_type', 'unknown') >>> print(f"Decomposition type: {decomp_type}")
- list_decompositions()
List all computed decompositions.
Returns a list of all computed decompositions with their selection names and decomposition types for easy overview.
Parameters
None
Returns
- list
List of dictionaries containing decomposition information
Examples
>>> decompositions = pipeline_data.list_decompositions() >>> for decomp in decompositions: ... print(f"Selection: {decomp['decomposition_name']}, Type: {decomp['type']}")
- get_cluster(cluster_name: str)
Retrieve a computed clustering result by cluster name.
This method returns the ClusterData instance for a previously computed clustering analysis. The returned object provides access to the cluster labels, metadata, and clustering parameters.
Parameters
- cluster_namestr
Name of the clustering result to retrieve
Returns
- ClusterData
The ClusterData instance containing cluster labels and metadata
Raises
- ValueError
If the requested clustering result has not been computed yet
Examples
>>> # Get clustering result by name >>> cluster_data = pipeline_data.get_cluster("dbscan_analysis") >>> labels = cluster_data.labels >>> metadata = cluster_data.metadata
>>> # Get clustering result with default name >>> cluster_data = pipeline_data.get_cluster("dbscan_eps0.5_min5") >>> n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
- list_clusters()
List all computed clustering results.
Returns a list of all computed clustering results with their names and basic information for easy overview.
Parameters
None
Returns
- list
List of dictionaries containing clustering information
Examples
>>> clusters = pipeline_data.list_clusters() >>> for cluster in clusters: ... print(f"Name: {cluster['name']}, Type: {cluster['type']}, " ... f"Clusters: {cluster['n_clusters']}")
- save(save_path: str) None
Save the complete PipelineData object to disk.
This method serializes the entire PipelineData object including all computed features, trajectories, clusterings, decompositions, and metadata to a file. The saved object can be loaded later to restore the complete analysis state without recomputation.
Parameters
- save_pathstr
Path where to save the PipelineData object. Should have a .pkl extension. The directory will be created if it doesn’t exist.
Returns
- None
Saves the PipelineData object to the specified path
Examples
>>> # Save after computing features >>> pipeline_data.save('analysis_results/pipeline_data.pkl')
>>> # Save with specific path structure >>> from pathlib import Path >>> save_dir = Path('project_results/session_001') >>> save_dir.mkdir(parents=True, exist_ok=True) >>> pipeline_data.save(f'{save_dir}/pipeline_analysis.pkl')
Notes
All computed features, clusterings, and decompositions are saved
Memory-mapped data files remain separate and are referenced
Complete pipeline state is preserved including configuration
- load(load_path: str) None
Load a previously saved PipelineData object from disk.
This method deserializes a PipelineData object from a file, restoring all computed features, trajectories, and analysis state. After loading, the object is ready for immediate use without requiring recomputation.
Parameters
- load_pathstr
Path to the saved PipelineData file (.pkl). The file must have been created using the save() method.
Returns
- None
Loads the PipelineData object from the specified path
Examples
>>> # Load previously saved analysis >>> pipeline_data = PipelineData() >>> pipeline_data.load('analysis_results/pipeline_data.pkl') >>> >>> # Access loaded features immediately >>> distances = pipeline_data.get_feature("distances") >>> contacts = pipeline_data.get_feature("contacts") >>> >>> # Continue analysis where you left off >>> mean_distances = distances.analysis.compute_mean()
Raises
- FileNotFoundError
If the specified file doesn’t exist
- ValueError
If the file is corrupted or not a valid PipelineData save file
Notes
All previously computed features are restored
Memory mapping settings and cache paths are preserved
If memory-mapped data files are missing, an error will occur
Complete pipeline state including configuration is restored
- get_selected_metadata(name: str)
Return metadata for all selected features.
This method retrieves the metadata for all features in a selection, providing detailed information about each column in the corresponding selected matrix. The metadata includes feature definitions and types, allowing for proper interpretation of the selected data.
The reference trajectory for metadata is determined by the one specified during the FeatureSelector.select() operation.
Parameters
- namestr
Name of the selection to retrieve
Returns
- numpy.ndarray
Array of dictionaries, one for each column in the selected matrix. Each dictionary has the structure:
{ 'features': original feature metadata entry, 'type': feature type name as string }
Raises
- ValueError
If selection not found or no metadata available
Examples
>>> # Get metadata for a selection >>> metadata = pipeline_data.get_selected_metadata("ala_analysis") >>> print(f"Number of selected features: {len(metadata)}") >>> >>> # Examine first feature >>> first_feature = metadata[0] >>> print(f"Feature type: {first_feature['type']}") >>> print(f"Feature details: {first_feature['features']}")
- validate_selection_exists(name: str)
Validate that the selection exists.
Parameters
- namestr
Name of the selection to validate
Returns
None
Raises
- ValueError
If the selection does not exist
- get_selected_data(feature_selector: str, data_selector: str | None = None, return_frame_mapping: bool = False)
Get data matrix with selected features and optionally selected frames.
This method combines feature selection (columns) and data selection (rows) to create a matrix with the desired subset of data. Feature selection is required to define which columns to include.
Frame mapping is only created when explicitly requested.
Parameters
- feature_selectorstr
Name of the feature selector (which columns to include). Must be provided - cannot be None.
- data_selectorstr, optional
Name of the data selector (which rows to include). If None, uses all available frames.
- return_frame_mappingbool, default=False
Whether to return frame mapping along with the matrix
Returns
- np.ndarray or Tuple[np.ndarray, Dict[int, tuple]]
If return_frame_mapping=False: Matrix with selected columns and optionally selected rows. If return_frame_mapping=True: Tuple of (matrix, frame_mapping).
Matrix shapes:
With data_selector: (n_selected_frames, n_selected_features)
Without data_selector: (n_all_frames, n_selected_features)
Frame mapping: {global_frame_index: (trajectory_index, local_frame_index)}
Raises
- ValueError
If feature_selector doesn’t exist, data_selector doesn’t exist, or no data available
Examples
>>> # Get data with both feature and frame selection >>> data = pipeline_data.get_selected_data( ... feature_selector="key_distances", ... data_selector="folded_frames" ... ) >>> print(f"Selected data shape: {data.shape}")
>>> # Get all frames but only selected features with mapping >>> data, mapping = pipeline_data.get_selected_data( ... feature_selector="important_features", ... return_frame_mapping=True ... )
- get_centroid_frame(feature_selector: str, data_selector: str) Tuple[int, int]
Find frame closest to DataSelector centroid.
Computes the centroid (mean) of all frames in a DataSelector and finds the frame closest to this centroid in feature space. This is a generic operation used across multiple modules.
Uses pipeline_data’s use_memmap and chunk_size settings for processing configuration.
Parameters
- feature_selectorstr
Name of feature selector to use for distance calculation
- data_selectorstr
Name of DataSelector to find centroid for
Returns
- Tuple[int, int]
(trajectory_index, frame_index) of centroid frame
Examples
>>> # Find centroid frame for a cluster >>> traj_idx, frame_idx = pipeline_data.get_centroid_frame( ... "my_features", "cluster_0" ... )
Notes
Centroid is the mean of all frames in the DataSelector
Closest frame minimizes Euclidean distance to centroid
Uses pipeline’s use_memmap and chunk_size configuration
Generic operation usable by clustering, feature importance, etc.
- get_comparison_data(comparison_name: str, sub_comparison_name: str) Tuple[ndarray, ndarray]
Get X (features) and y (labels) for a specific comparison sub-comparison.
This method provides the central access point for comparison data, combining ComparisonData metadata with efficient data processing. Used by modules to get ready-to-use datasets for analysis.
Parameters
- comparison_namestr
Name of the comparison to retrieve data from
- sub_comparison_namestr
Name of the specific sub-comparison within the comparison
Returns
- Tuple[np.ndarray, np.ndarray]
Tuple of (X, y) where:
X is the feature matrix with selected features and frames
y is the label array for the comparison groups
Raises
- ValueError
If comparison not found, sub-comparison not found, or no data available
Examples
>>> # Get data for a binary comparison >>> X, y = pipeline_data.get_comparison_data("folded_vs_unfolded", "main") >>> print(f"Features shape: {X.shape}") >>> print(f"Labels: {np.unique(y)}")
>>> # Get data for one-vs-rest comparison >>> X, y = pipeline_data.get_comparison_data("conformations", "folded_vs_rest") >>> print(f"Data shape: {X.shape}, Labels: {np.unique(y)}")
- get_config() dict
Get current configuration parameters.
Returns the current configuration settings for chunk_size, cache_dir, and use_memmap that are used across the pipeline.
Returns
- dict
Dictionary containing current configuration values
Examples
Check current configuration:
>>> pipeline_data = PipelineData(chunk_size=1000, use_memmap=True) >>> config = pipeline_data.get_config() >>> print(config['chunk_size']) # 1000 >>> print(config['use_memmap']) # True
- clear_matrix_cache(feature_selector: str | None = None, data_selector: str | None = None) None
Clear matrix cache when data changes.
This method clears cached memmap matrices to ensure fresh data is used after modifications. Only affects cached matrices when use_memmap=True.
Parameters
- feature_selectorstr, optional
If provided, only clear cache for this feature selector.
- data_selectorstr, optional
If provided, only clear cache using this data selector.
Returns
- None
Clears matching cache entries
Examples
>>> # Clear specific combination >>> pipeline_data.clear_matrix_cache("contacts_only", "folded")
>>> # Clear all with feature selector >>> pipeline_data.clear_matrix_cache("contacts_only")
>>> # Clear all with data selector >>> pipeline_data.clear_matrix_cache(data_selector="folded")
>>> # Clear all cached matrices >>> pipeline_data.clear_matrix_cache()
Notes
Both None: clears entire cache
Only feature_selector: clears all with this feature_selector
Only data_selector: clears all using this data_selector
Both specified: clears specific combination