Pipeline Manager
GitHub Link to Code.
Central pipeline manager with automatic data injection.
This module provides the PipelineManager class that serves as the central orchestration point for all analysis workflows. It uses AutoInjectProxy to automatically inject PipelineData into manager methods that need it.
- class mdxplain.pipeline.manager.pipeline_manager.PipelineManager(stride: int = 1, concat: bool = False, selection: str | None = None, use_memmap: bool = True, chunk_size: int = 2000, dtype: type = <class 'numpy.float32'>, cache_dir: str = './cache', max_memory_gb: float = 6.0, show_progress: bool = True, use_stability_config: bool | None = None)
Central pipeline manager with automatic PipelineData injection.
This class provides a unified interface to all analysis modules through AutoInjectProxy instances that automatically inject PipelineData into methods that expect it while leaving utility methods unchanged.
The PipelineManager is designed to simplify the usage of the mdxplain pipeline system by providing a single entry point for all analysis workflows. It manages the trajectory loading, feature computation, clustering, and decomposition processes in a cohesive manner.
It is the single entry point for:
Trajectory loading and validation
Feature computation
Feature selection
Clustering analysis
Decomposition analysis
Data selection
Comparison management
Feature importance analysis
General analysis operations
Visualization and plotting
Examples
Basic pipeline workflow:
>>> pipeline = PipelineManager() >>> >>> # Methods expecting pipeline_data get automatic injection >>> pipeline.trajectory.load_trajectories('../data') >>> pipeline.feature.compute_features('distances', 'res CA') >>> pipeline.feature_selector.create('my_selection') >>> pipeline.feature_selector.add('my_selection', 'distances', 'res ALA') >>> pipeline.feature_selector.select('my_selection') >>> pipeline.clustering.cluster('my_features', 'dbscan', eps=0.5) >>> >>> # Utility methods work without injection >>> valid = pipeline.trajectory.validate_selection('res CA') >>> formats = pipeline.trajectory.get_supported_formats() >>> >>> # Advanced: Direct data access >>> summary = pipeline.data.get_data_summary()
- __init__(stride: int = 1, concat: bool = False, selection: str | None = None, use_memmap: bool = True, chunk_size: int = 2000, dtype: type = <class 'numpy.float32'>, cache_dir: str = './cache', max_memory_gb: float = 6.0, show_progress: bool = True, use_stability_config: bool | None = None)
Initialize the pipeline manager with configuration for all managers.
Parameters
- strideint, default=1
Default stride for trajectory loading. Larger values reduce memory footprint and downstream compute at the cost of temporal resolution. For large datasets, consider stride > 1 to keep feature matrices smaller and more cache-friendly.
- concatbool, default=False
Default concatenation setting for trajectories. When True, multiple trajectories are concatenated into one, which simplifies indexing but may increase memory pressure for very long series.
- selectionstr, optional
Default MDTraj selection string for trajectories. Use this to restrict atom selection at load time to reduce downstream features.
- use_memmapbool, default=True
Whether to use memory mapping for feature and decomposition data. Enable this for large datasets that do not fit comfortably in RAM. When stability settings are enabled, the pipeline lowers I/O priority during large sequential scans to keep the system responsive.
- chunk_sizeint, default=2000
Processing chunk size for feature and decomposition computation. Larger chunks reduce overhead but increase peak memory and I/O burst size. For very large memmaps, a moderate chunk size tends to provide the best I/O behavior.
- dtypetype, default=np.float32
Data type for feature matrices (float32 or float64). float32 saves 50% memory and is sufficient for most MD analysis. Use float64 only if you need extreme numerical precision or stable eigenvalues.
- cache_dirstr, default=”./cache”
Cache directory path for all managers. For memmap-heavy workloads, use a fast local SSD to avoid I/O contention.
- max_memory_gbfloat, default=6.0
Maximum memory in GB for dataset processing. Used for memory-aware sampling in algorithms like DecisionTree. Increase this on large workstations to reduce sampling, or keep it low to stay responsive.
- show_progressbool, default=True
Enable or disable progress bars globally (tqdm). When False, all progress bars are suppressed. This also disables resource limit reporting unless you call report_resource_limits explicitly.
- use_stability_configbool or None, default=None
Choose how resource limits are applied at initialization. The default (None) enables stability settings only when use_memmap is True; otherwise it preserves OS defaults. When True, the pipeline applies a stability policy immediately: it lowers CPU priority (nice=15), keeps two CPU cores free via affinity, reduces I/O priority to “low”, and caps BLAS/OpenMP thread pools to the active CPU set. When False, no resource limits are applied at startup and the process keeps OS defaults. All settings are stored in pipeline.config.performance; any change there is applied immediately.
Returns
- None
Initializes PipelineManager with automatic data injection
- report_resource_limits() None
Report applied resource limits to stdout when progress output is enabled.
This method is public to let users re-print the current limits after changing settings at runtime. It is intentionally lightweight and avoids strong formatting guarantees; the output is meant for human inspection rather than machine parsing.
Returns
- None
Prints a one-line summary plus any warnings when applicable.
- property trajectory: TrajectoryManager
Access trajectory management with automatic PipelineData injection.
Returns
- TrajectoryManager
Trajectory manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- set_show_progress(enabled: bool) None
Public helper to toggle progress bars at runtime.
Parameters
- enabledbool
Desired progress bar state. True enables bars, False suppresses them.
Returns
- None
Updates the progress controller for subsequent operations.
Examples
>>> pipeline = PipelineManager(show_progress=True) >>> pipeline.set_show_progress(False)
- property feature: FeatureManager
Access feature computation with automatic PipelineData injection.
Returns
- FeatureManager
Feature manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- property clustering: ClusterManager
Access clustering analysis with automatic PipelineData injection.
Returns
- ClusterManager
Cluster manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- property decomposition: DecompositionManager
Access decomposition analysis with automatic PipelineData injection.
Returns
- DecompositionManager
Decomposition manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- property feature_selector: FeatureSelectorManager
Access feature selector management with automatic PipelineData injection.
Returns
- FeatureSelectorManager
Feature selector manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- property data_selector: DataSelectorManager
Access data selector management with automatic PipelineData injection.
Returns
- DataSelectorManager
Data selector manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- property comparison: ComparisonManager
Access comparison management with automatic PipelineData injection.
Returns
- ComparisonManager
Comparison manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- property feature_importance: FeatureImportanceManager
Access feature importance analysis with automatic PipelineData injection.
Returns
- FeatureImportanceManager
Feature importance manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- property analysis: AnalysisManager
Access analysis operations with automatic PipelineData injection.
Returns
- AnalysisManager
Analysis manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.
- property plots: PlotsManager
Access plotting and visualization operations.
Returns
- PlotsManager
Plots manager for creating visualizations. Provides three access patterns:
Direct: pipeline.plots.landscape(…)
Decomposition-focused: pipeline.plots.decomposition.landscape(…)
Clustering-focused: pipeline.plots.clustering.landscape(…)
Examples
>>> # Direct landscape plot >>> pipeline.plots.landscape("pca", [0, 1])
>>> # Decomposition-focused >>> pipeline.plots.decomposition.landscape("pca", [0, 1])
>>> # Clustering-focused with centers >>> pipeline.plots.clustering.landscape( ... "dbscan", "pca", [0, 1], show_centers=True ... )
- property structure_visualization: StructureVisualizationManager
Access 3D structure visualization with automatic PipelineData injection.
Returns
- StructureVisualizationManager
Structure visualization manager with automatic PipelineData injection.
Examples
>>> # Beta-factor visualization >>> pipeline.structure_visualization.visualize_importance_beta_factors( ... "dt_analysis", "cluster_0_vs_rest", n_top=10 ... )
- property data
Direct access to pipeline data (advanced usage).
Provides direct access to the central PipelineData container for advanced users who need to inspect or manipulate data directly. Normal usage should go through the manager properties.
Returns
- PipelineData
Central pipeline data container with all analysis data
- summary() Dict[str, Any]
Get summary of all pipeline data.
Returns
- dict
Summary information about all loaded and computed data
- add_custom_metadata(name: str, value: Any, overwrite: bool = False, warn_if_large: bool = True, max_size_gb: float | None = None) None
Register user-defined custom metadata in the pipeline state.
Parameters
- namestr
Metadata key.
- valueAny
Metadata payload to persist with the pipeline.
- overwritebool, default=False
If False, existing keys raise ValueError.
- warn_if_largebool, default=True
Emit RuntimeWarning when the estimated object size exceeds the configured threshold.
- max_size_gbfloat, optional
Explicit warning threshold in GB. If None, uses
pipeline.data.max_memory_gb.
Returns
- None
Stores metadata in-place.
Notes
TODO: Add optional disk-backed backend (e.g. zarr/proxy) for large nested metadata payloads. Current implementation keeps metadata in RAM.
- get_custom_metadata(name: str) Any
Retrieve a registered custom metadata payload by key.
Parameters
- namestr
Metadata key.
Returns
- Any
Stored metadata payload.
- clear_all() None
Clear all pipeline data.
Resets the entire pipeline to empty state, clearing all trajectories, features, clustering, and decomposition results.
- close() None
Release in-memory resources and memmap handles owned by this pipeline.
This method closes memmap-backed arrays and trajectory runtime caches without deleting cache files on disk. It is safe to call multiple times.
Returns
- None
Frees resources and detaches in-memory references.
- save_to_single_file(save_path: str) None
Save complete pipeline to single pickle file.
This method saves the entire PipelineData object including all computed features, trajectories, clusterings, decompositions, and metadata to a single file. Memmap files remain in cache directory.
Parameters
- save_pathstr
Path where to save the complete pipeline
Returns
- None
Saves the complete pipeline to the specified path
Examples
>>> pipeline.save_to_single_file('complete_analysis.pkl')
- static load_from_single_file(load_path: str, cache_dir: str | None = './cache', chunk_size: int = 1000, stride: int = 1, concat: bool = False, selection: str | None = None, show_progress: bool = True) PipelineManager
Load complete pipeline from single pickle file.
This static method creates a new PipelineManager instance with specified cache directory and loads pipeline state from file. Memmap files are expected in the cache directory.
Parameters
- load_pathstr
Path to the saved pipeline pickle file
- cache_dirstr, optional
Fallback cache directory used only when saved cache metadata is unavailable. By default, the cache scope stored in the single-file payload is reused to preserve pipeline identity.
- chunk_sizeint, default=1000
Default chunk size for future operations
- strideint, default=1
Default stride for future trajectory loading
- concatbool, default=False
Default concat mode for future trajectory loading
- selectionstr, optional
Default MDTraj selection string for trajectories
- show_progressbool, default=True
Enable or disable progress bars globally (tqdm)
Returns
- PipelineManager
New PipelineManager instance with loaded pipeline state
Examples
>>> loaded_pipeline = PipelineManager.load_from_single_file('analysis.pkl') >>> loaded_pipeline.print_info()
>>> # Load with custom cache directory >>> loaded_pipeline = PipelineManager.load_from_single_file( ... 'analysis.pkl', ... cache_dir='./my_cache' ... )
>>> # Load with custom trajectory defaults for adding more data >>> loaded_pipeline = PipelineManager.load_from_single_file( ... 'analysis.pkl', ... stride=10 ... ) >>> # Now load additional trajectories with stride=10 >>> loaded_pipeline.trajectory.load_trajectories('new_data/')
- create_sharable_archive(archive_path: str, compression: str = 'zst', exclude_visualizations: bool = True, include_structure_files: bool = True, compression_level: int | None = None, zstd_threads: int | None = None, zstd_reserve_cores: int = 2, sha: bool | str = True, overwrite: bool = False) str
Create sharable compressed archive with pipeline and essential data.
Creates compressed tar archive containing pipeline pickle file and all necessary memmap files from cache directory. Excludes visualization outputs by default for smaller archive size.
Parameters
- archive_pathstr
Path for output archive (extension added automatically)
- compressionstr, default=”zst”
Compression method: “zst”, “bz2”, or “gz”
- exclude_visualizationsbool, default=True
If True, exclude PNG/PDF/SVG plot outputs
- include_structure_filesbool, default=True
If True, include PDB/PML structure files
- compression_levelint, optional
Compression level override (e.g. zst level 1-19).
- zstd_threadsint, optional
Thread count for zstd compression. If None, chosen automatically.
- zstd_reserve_coresint, default=2
Number of CPU cores to keep free when zstd thread count is automatic.
- shabool or str, default=True
If True, write
<archive>.shanext to the created archive. When a string is provided, it is used as the explicit SHA256 output path.- overwritebool, default=False
If True, replace existing archive outputs. When False, existing archive or SHA256 outputs raise
FileExistsError.
Returns
- str
Path to created archive file
Examples
>>> # Minimal archive (only data) >>> pipeline.create_sharable_archive("analysis.tar.zst")
>>> # Full archive (with visualizations) >>> pipeline.create_sharable_archive( ... "analysis_full.tar.zst", ... exclude_visualizations=False ... )
>>> # Data only (no structure files) >>> pipeline.create_sharable_archive( ... "analysis_data.tar.zst", ... include_structure_files=False ... )
>>> pipeline.create_sharable_archive( ... "analysis.tar.zst", ... sha="checksums/analysis.sha" ... )
Notes
zstd compression is optimized for fast runtime and low memory pressure
zstd compression is multithreaded by default
With use_memmap=False: Archive contains only pipeline.pkl + optional PDB/PML
With use_memmap=True: Archive contains pipeline.pkl + .dat files + zarr directories
Paths are preserved relative to cache directory
Memmap and zarr only included when use_memmap=True
- static load_from_archive(file_path: str, cache_dir: str = './cache', verify: bool = True, sha: str | None = None, download_url: str | None = None, overwrite: bool = False, chunk_size: int = 1000, stride: int = 1, concat: bool = False, selection: str | None = None, show_progress: bool = True) PipelineManager
Load pipeline from sharable archive.
Extracts compressed archive, moves cache files to specified cache directory, and loads pipeline state. Automatically repairs memmap file paths to point to new cache location.
Parameters
- file_pathstr
Local archive path, local download target, or remote archive URL.
- cache_dirstr, default=”./cache”
Target cache directory for extracted files
- verifybool, default=True
Whether to validate the archive via SHA256 before loading. When
shais provided, SHA256 verification is performed even ifverifyis False.- shastr, optional
SHA256 input used for archive verification. May be provided as a raw SHA256 hex string, a local path to a
.shafile, or a URL. When verification is enabled andshais missing, loading fails.- download_urlstr, optional
Remote source URL used when
file_pathshould act as the local archive target. When omitted andfile_pathis itself a URL, downloads are stored under<cache_dir>/downloads/.- overwritebool, default=False
For remote URLs, controls whether an existing local target file is replaced. When False, an existing downloaded file is reused and a warning is emitted.
- chunk_sizeint, default=1000
Default chunk size for future operations
- strideint, default=1
Default stride for future trajectory loading
- concatbool, default=False
Default concatenation setting for future trajectory loading
- selectionstr, optional
Default MDTraj selection string for trajectories
- show_progressbool, default=True
Enable or disable progress bars globally (tqdm)
Returns
- PipelineManager
Loaded pipeline instance
Examples
>>> # Load a remote archive with explicit verification disabled >>> pipeline = PipelineManager.load_from_archive( ... "local_analysis.tar.zst", ... cache_dir="./my_cache", ... download_url="https://example.org/analysis.tar.zst", ... verify=False ... )
>>> # Load with SHA256 verification from a sidecar file >>> pipeline = PipelineManager.load_from_archive( ... "analysis.tar.zst", ... verify=True, ... sha="analysis.tar.zst.sha" ... )
Notes
Extracts to temporary directory
Moves cache files to specified cache_dir
Automatically repairs memmap paths for portability
Cache directory created if it doesn’t exist
- print_info() None
Print comprehensive pipeline information.
This method prints information from ALL managers to provide a complete overview of the pipeline state.
Returns
- None
Prints comprehensive pipeline information to console
Examples
>>> pipeline.print_info() ======= PIPELINE INFORMATION =======
— Trajectory Data — Loaded 3 trajectories:
[0] system1_traj1: 1000 frames [1] system1_traj2: 1500 frames [2] system2_traj1: 800 frames
— Feature Data — Feature Types: 2 (distances, contacts)
— Clustering Data — Clustering Names: 1 (conformations)
(… information from all managers …)
- update_config(chunk_size: int = None, cache_dir: str = None, use_memmap: bool = None)
Update pipeline configuration parameters at runtime.
Allows modification of key configuration parameters after pipeline initialization. Changes are propagated to all managers and the central PipelineData container.
Parameters
- chunk_sizeint, optional
New chunk size for processing operations. Must be positive integer.
- cache_dirstr, optional
New cache directory path. Directory will be created if it doesn’t exist.
- use_memmapbool, optional
Whether to use memory mapping for data storage operations.
Returns
- None
Updates configuration in all components
Raises
- ValueError
If chunk_size is not a positive integer
- OSError
If cache_dir cannot be created
Examples
Update chunk size for better memory management:
>>> pipeline.update_config(chunk_size=5000)
Change cache directory and enable memory mapping:
>>> pipeline.update_config(cache_dir="/tmp/mdx_cache", use_memmap=True)
- get_config() dict
Get current pipeline configuration parameters.
Returns the current configuration settings that are active across all pipeline components.
Returns
- dict
Dictionary containing current configuration values
Examples
Check current configuration:
>>> pipeline = PipelineManager(chunk_size=1000, use_memmap=True) >>> config = pipeline.get_config() >>> print(f"Using chunk size: {config['chunk_size']}") >>> print(f"Memory mapping: {config['use_memmap']}")