Pipeline Entities

GitHub Link to Code.

Pipeline entities module.

Contains data container classes for the pipeline orchestration system.

Pipeline Data

Central data container for the Pipeline orchestration system.

This module provides the PipelineData class that serves as the central data container orchestrating all analysis data including trajectories, features, clustering results, and decomposition results.

class mdxplain.pipeline.entities.pipeline_data.PipelineData(use_memmap: bool = False, cache_dir: str = './cache', chunk_size: int = 2000, dtype: type = <class 'numpy.float32'>, max_memory_gb: float = 6.0)

Central data container orchestrating all analysis data.

This class serves as the central data hub for the pipeline system, containing all trajectory data, computed features, clustering results, decomposition results, and future analysis modules.

The PipelineData serves as the central “God-Object” that gets passed around to managers, following the builder pattern while providing separation of concerns.

Attributes

use_memmapbool

Whether to use memory mapping for large datasets

cache_dirstr

Directory for cache files when using memory mapping

chunk_sizeint

Chunk size for memory-efficient processing

dtypetype

Data type for feature matrices (float32 or float64)

max_memory_gbfloat

Estimated maximum memory usage in GB for current data

trajectory_dataTrajectoryData

Container for trajectory data and metadata

feature_dataDict[str, Dict[int, FeatureData]]

Nested dictionary of computed features by type and trajectory index

selected_feature_dataDict[str, FeatureSelectorData]

Dictionary of feature selection results by selection name

decomposition_dataDict[str, DecompositionData]

Dictionary of decomposition results by selection name

cluster_dataDict[str, ClusterData]

Dictionary of clustering results by cluster name

data_selector_dataDict[str, DataSelectorData]

Dictionary of data selector results by selector name

data_selector_groupsDict[str, DataSelectorGroup]

Dictionary of data selector groups by group name

comparison_dataDict[str, ComparisonData]

Dictionary of comparison results by comparison name

feature_importance_dataDict[str, FeatureImportanceData]

Dictionary of feature importance results by analysis name

structure_visualization_dataDict[str, StructureVisualizationData]

Dictionary of structure visualization data by session name

custom_metadataDict[str, Any]

User-defined custom metadata attached to the pipeline state

Examples

Pipeline mode (automatic):

>>> pipeline = PipelineManager()
>>> # PipelineData is managed automatically
>>> pipeline.trajectory.load_trajectories('../data')

Standalone mode (manual):

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.load_trajectories(pipeline_data, '../data')
__init__(use_memmap: bool = False, cache_dir: str = './cache', chunk_size: int = 2000, dtype: type = <class 'numpy.float32'>, max_memory_gb: float = 6.0)

Initialize the central pipeline data container.

Creates empty containers for all analysis data types that will be populated through the respective manager interfaces.

Parameters

use_memmapbool, default=False

Whether to use memory mapping for large datasets

cache_dirstr, default=”./cache”

Directory for cache files when using memory mapping

chunk_sizeint, default=2000

Chunk size for memory-efficient processing

dtypetype, default=np.float32

Data type for feature matrices (float32 or float64). float32 saves 50% memory and is sufficient for most MD analysis. Use float64 only if extreme numerical precision required.

max_memory_gbfloat, default=6.0

Maximum memory in GB for dataset processing. Used for memory-aware sampling in algorithms like DecisionTree. Datasets exceeding this limit will be automatically sampled.

Returns

None

Initializes PipelineData instance with empty data containers

clear_all_data() None

Clear all stored analysis data.

Resets all data containers to empty state, effectively clearing all computed results while preserving the container structure. Useful for starting fresh analysis or freeing memory.

Returns

None

Clears all data containers in-place

Examples

>>> pipeline_data = PipelineData()
>>> # ... after computations ...
>>> pipeline_data.clear_all_data()
update_max_memory_from_trajectories(max_atoms: int) None

Update memory estimate after trajectory loading.

Parameters

max_atomsint

Maximum number of atoms across all trajectories

Returns

None

Updates max_memory_gb based on atom count

update_max_memory_from_features(n_features: int) None

Update memory estimate after feature computation.

Parameters

n_featuresint

Actual number of features computed

Returns

None

Updates max_memory_gb based on actual feature count

get_data_summary() Dict[str, Any]

Get summary information about all stored data.

Provides an overview of all data containers with counts and availability information. Useful for debugging and monitoring the state of the pipeline.

Returns

dict

Summary dictionary with data container information

Examples

>>> pipeline_data = PipelineData()
>>> summary = pipeline_data.get_data_summary()
>>> print(summary['trajectories_loaded'])
>>> print(summary['features_computed'])
add_custom_metadata(name: str, value: Any, overwrite: bool = False) None

Register custom metadata payload in the pipeline state.

Parameters

namestr

Metadata key.

valueAny

Metadata payload to store.

overwritebool, default=False

If False, existing keys raise ValueError.

Returns

None

Stores metadata in-place.

get_custom_metadata(name: str) Any

Get a previously registered custom metadata payload.

Parameters

namestr

Metadata key.

Returns

Any

Stored payload.

Raises

ValueError

If the key does not exist.

has_trajectories() bool

Check if trajectory data is available.

Returns

bool

True if trajectories are loaded, False otherwise

has_features() bool

Check if any feature data is available.

Returns

bool

True if features are computed, False otherwise

has_clusterings() bool

Check if any clustering results are available.

Returns

bool

True if clustering results exist, False otherwise

has_decompositions() bool

Check if any decomposition results are available.

Returns

bool

True if decomposition results exist, False otherwise

get_feature(feature_type: str | Any) Any

Retrieve a computed feature by its type.

This method returns the FeatureData instance for a previously computed feature. The returned object provides access to the computed data, feature names, analysis methods, and data reduction capabilities.

Supports three input variants:

  • feature_type.Distances() (instance)

  • feature_type.Distances (class with metaclass)

  • “distances” (string)

Parameters

feature_typeFeatureTypeBase, FeatureTypeBase class, or str

Feature type instance, class, or string (e.g., Distances(), Distances, “distances”)

Returns

FeatureData

The FeatureData instance containing computed data and analysis methods

Raises

ValueError

If the requested feature type has not been computed yet

Examples

>>> # Get distances feature - all variants work:
>>> distances = pipeline_data.get_feature("distances")
>>> distance_data = distances.get_data()
>>> feature_names = distances.get_feature_names()
>>> # Get contacts and apply analysis
>>> contacts = pipeline_data.get_feature("contacts")
>>> frequency = contacts.analysis.compute_frequency()
get_decomposition(decomposition_name: str)

Retrieve a computed decomposition by selection name.

This method returns the DecompositionData instance for a previously computed decomposition. The returned object provides access to the decomposed data, metadata, hyperparameters, and transformation details.

Parameters

decomposition_namestr

Name of the decomposition

Returns

DecompositionData

The DecompositionData instance containing decomposed data and metadata

Raises

ValueError

If the requested decomposition has not been computed yet

Examples

>>> # Get decomposition for a selection
>>> decomp_data = pipeline_data.get_decomposition("feature_sel")
>>> transformed = decomp_data.get_data()
>>> metadata = decomp_data.get_metadata()
>>> # Get decomposition type from metadata
>>> decomp_type = decomp_data.metadata.get('decomposition_type', 'unknown')
>>> print(f"Decomposition type: {decomp_type}")
list_decompositions()

List all computed decompositions.

Returns a list of all computed decompositions with their selection names and decomposition types for easy overview.

Parameters

None

Returns

list

List of dictionaries containing decomposition information

Examples

>>> decompositions = pipeline_data.list_decompositions()
>>> for decomp in decompositions:
...     print(f"Selection: {decomp['decomposition_name']}, Type: {decomp['type']}")
get_cluster(cluster_name: str)

Retrieve a computed clustering result by cluster name.

This method returns the ClusterData instance for a previously computed clustering analysis. The returned object provides access to the cluster labels, metadata, and clustering parameters.

Parameters

cluster_namestr

Name of the clustering result to retrieve

Returns

ClusterData

The ClusterData instance containing cluster labels and metadata

Raises

ValueError

If the requested clustering result has not been computed yet

Examples

>>> # Get clustering result by name
>>> cluster_data = pipeline_data.get_cluster("dbscan_analysis")
>>> labels = cluster_data.labels
>>> metadata = cluster_data.metadata
>>> # Get clustering result with default name
>>> cluster_data = pipeline_data.get_cluster("dbscan_eps0.5_min5")
>>> n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
list_clusters()

List all computed clustering results.

Returns a list of all computed clustering results with their names and basic information for easy overview.

Parameters

None

Returns

list

List of dictionaries containing clustering information

Examples

>>> clusters = pipeline_data.list_clusters()
>>> for cluster in clusters:
...     print(f"Name: {cluster['name']}, Type: {cluster['type']}, "
...           f"Clusters: {cluster['n_clusters']}")
save(save_path: str) None

Save the complete PipelineData object to disk.

This method serializes the entire PipelineData object including all computed features, trajectories, clusterings, decompositions, and metadata to a file. The saved object can be loaded later to restore the complete analysis state without recomputation.

Parameters

save_pathstr

Path where to save the PipelineData object. Should have a .pkl extension. The directory will be created if it doesn’t exist.

Returns

None

Saves the PipelineData object to the specified path

Examples

>>> # Save after computing features
>>> pipeline_data.save('analysis_results/pipeline_data.pkl')
>>> # Save with specific path structure
>>> from pathlib import Path
>>> save_dir = Path('project_results/session_001')
>>> save_dir.mkdir(parents=True, exist_ok=True)
>>> pipeline_data.save(f'{save_dir}/pipeline_analysis.pkl')

Notes

  • All computed features, clusterings, and decompositions are saved

  • Memory-mapped data files remain separate and are referenced

  • Complete pipeline state is preserved including configuration

load(load_path: str) None

Load a previously saved PipelineData object from disk.

This method deserializes a PipelineData object from a file, restoring all computed features, trajectories, and analysis state. After loading, the object is ready for immediate use without requiring recomputation.

Parameters

load_pathstr

Path to the saved PipelineData file (.pkl). The file must have been created using the save() method.

Returns

None

Loads the PipelineData object from the specified path

Examples

>>> # Load previously saved analysis
>>> pipeline_data = PipelineData()
>>> pipeline_data.load('analysis_results/pipeline_data.pkl')
>>>
>>> # Access loaded features immediately
>>> distances = pipeline_data.get_feature("distances")
>>> contacts = pipeline_data.get_feature("contacts")
>>>
>>> # Continue analysis where you left off
>>> mean_distances = distances.analysis.compute_mean()

Raises

FileNotFoundError

If the specified file doesn’t exist

ValueError

If the file is corrupted or not a valid PipelineData save file

Notes

  • All previously computed features are restored

  • Memory mapping settings and cache paths are preserved

  • If memory-mapped data files are missing, an error will occur

  • Complete pipeline state including configuration is restored

get_selected_metadata(name: str)

Return metadata for all selected features.

This method retrieves the metadata for all features in a selection, providing detailed information about each column in the corresponding selected matrix. The metadata includes feature definitions and types, allowing for proper interpretation of the selected data.

The reference trajectory for metadata is determined by the one specified during the FeatureSelector.select() operation.

Parameters

namestr

Name of the selection to retrieve

Returns

numpy.ndarray

Array of dictionaries, one for each column in the selected matrix. Each dictionary has the structure:

{
    'features': original feature metadata entry,
    'type': feature type name as string
}

Raises

ValueError

If selection not found or no metadata available

Examples

>>> # Get metadata for a selection
>>> metadata = pipeline_data.get_selected_metadata("ala_analysis")
>>> print(f"Number of selected features: {len(metadata)}")
>>>
>>> # Examine first feature
>>> first_feature = metadata[0]
>>> print(f"Feature type: {first_feature['type']}")
>>> print(f"Feature details: {first_feature['features']}")
validate_selection_exists(name: str)

Validate that the selection exists.

Parameters

namestr

Name of the selection to validate

Returns

None

Raises

ValueError

If the selection does not exist

get_selected_data(feature_selector: str, data_selector: str | None = None, return_frame_mapping: bool = False)

Get data matrix with selected features and optionally selected frames.

This method combines feature selection (columns) and data selection (rows) to create a matrix with the desired subset of data. Feature selection is required to define which columns to include.

Frame mapping is only created when explicitly requested.

Parameters

feature_selectorstr

Name of the feature selector (which columns to include). Must be provided - cannot be None.

data_selectorstr, optional

Name of the data selector (which rows to include). If None, uses all available frames.

return_frame_mappingbool, default=False

Whether to return frame mapping along with the matrix

Returns

np.ndarray or Tuple[np.ndarray, Dict[int, tuple]]

If return_frame_mapping=False: Matrix with selected columns and optionally selected rows. If return_frame_mapping=True: Tuple of (matrix, frame_mapping).

  • Matrix shapes:

    • With data_selector: (n_selected_frames, n_selected_features)

    • Without data_selector: (n_all_frames, n_selected_features)

  • Frame mapping: {global_frame_index: (trajectory_index, local_frame_index)}

Raises

ValueError

If feature_selector doesn’t exist, data_selector doesn’t exist, or no data available

Examples

>>> # Get data with both feature and frame selection
>>> data = pipeline_data.get_selected_data(
...     feature_selector="key_distances",
...     data_selector="folded_frames"
... )
>>> print(f"Selected data shape: {data.shape}")
>>> # Get all frames but only selected features with mapping
>>> data, mapping = pipeline_data.get_selected_data(
...     feature_selector="important_features",
...     return_frame_mapping=True
... )
get_centroid_frame(feature_selector: str, data_selector: str) Tuple[int, int]

Find frame closest to DataSelector centroid.

Computes the centroid (mean) of all frames in a DataSelector and finds the frame closest to this centroid in feature space. This is a generic operation used across multiple modules.

Uses pipeline_data’s use_memmap and chunk_size settings for processing configuration.

Parameters

feature_selectorstr

Name of feature selector to use for distance calculation

data_selectorstr

Name of DataSelector to find centroid for

Returns

Tuple[int, int]

(trajectory_index, frame_index) of centroid frame

Examples

>>> # Find centroid frame for a cluster
>>> traj_idx, frame_idx = pipeline_data.get_centroid_frame(
...     "my_features", "cluster_0"
... )

Notes

  • Centroid is the mean of all frames in the DataSelector

  • Closest frame minimizes Euclidean distance to centroid

  • Uses pipeline’s use_memmap and chunk_size configuration

  • Generic operation usable by clustering, feature importance, etc.

get_comparison_data(comparison_name: str, sub_comparison_name: str) Tuple[ndarray, ndarray]

Get X (features) and y (labels) for a specific comparison sub-comparison.

This method provides the central access point for comparison data, combining ComparisonData metadata with efficient data processing. Used by modules to get ready-to-use datasets for analysis.

Parameters

comparison_namestr

Name of the comparison to retrieve data from

sub_comparison_namestr

Name of the specific sub-comparison within the comparison

Returns

Tuple[np.ndarray, np.ndarray]

Tuple of (X, y) where:

  • X is the feature matrix with selected features and frames

  • y is the label array for the comparison groups

Raises

ValueError

If comparison not found, sub-comparison not found, or no data available

Examples

>>> # Get data for a binary comparison
>>> X, y = pipeline_data.get_comparison_data("folded_vs_unfolded", "main")
>>> print(f"Features shape: {X.shape}")
>>> print(f"Labels: {np.unique(y)}")
>>> # Get data for one-vs-rest comparison
>>> X, y = pipeline_data.get_comparison_data("conformations", "folded_vs_rest")
>>> print(f"Data shape: {X.shape}, Labels: {np.unique(y)}")
get_config() dict

Get current configuration parameters.

Returns the current configuration settings for chunk_size, cache_dir, and use_memmap that are used across the pipeline.

Returns

dict

Dictionary containing current configuration values

Examples

Check current configuration:

>>> pipeline_data = PipelineData(chunk_size=1000, use_memmap=True)
>>> config = pipeline_data.get_config()
>>> print(config['chunk_size'])  # 1000
>>> print(config['use_memmap'])   # True
clear_matrix_cache(feature_selector: str | None = None, data_selector: str | None = None) None

Clear matrix cache when data changes.

This method clears cached memmap matrices to ensure fresh data is used after modifications. Only affects cached matrices when use_memmap=True.

Parameters

feature_selectorstr, optional

If provided, only clear cache for this feature selector.

data_selectorstr, optional

If provided, only clear cache using this data selector.

Returns

None

Clears matching cache entries

Examples

>>> # Clear specific combination
>>> pipeline_data.clear_matrix_cache("contacts_only", "folded")
>>> # Clear all with feature selector
>>> pipeline_data.clear_matrix_cache("contacts_only")
>>> # Clear all with data selector
>>> pipeline_data.clear_matrix_cache(data_selector="folded")
>>> # Clear all cached matrices
>>> pipeline_data.clear_matrix_cache()

Notes

  • Both None: clears entire cache

  • Only feature_selector: clears all with this feature_selector

  • Only data_selector: clears all using this data_selector

  • Both specified: clears specific combination