Pipeline Manager

GitHub Link to Code.

Central pipeline manager with automatic data injection.

This module provides the PipelineManager class that serves as the central orchestration point for all analysis workflows. It uses AutoInjectProxy to automatically inject PipelineData into manager methods that need it.

class mdxplain.pipeline.manager.pipeline_manager.PipelineManager(stride: int = 1, concat: bool = False, selection: str | None = None, use_memmap: bool = True, chunk_size: int = 2000, dtype: type = <class 'numpy.float32'>, cache_dir: str = './cache', max_memory_gb: float = 6.0, show_progress: bool = True, use_stability_config: bool | None = None)

Central pipeline manager with automatic PipelineData injection.

This class provides a unified interface to all analysis modules through AutoInjectProxy instances that automatically inject PipelineData into methods that expect it while leaving utility methods unchanged.

The PipelineManager is designed to simplify the usage of the mdxplain pipeline system by providing a single entry point for all analysis workflows. It manages the trajectory loading, feature computation, clustering, and decomposition processes in a cohesive manner.

It is the single entry point for:

  • Trajectory loading and validation

  • Feature computation

  • Feature selection

  • Clustering analysis

  • Decomposition analysis

  • Data selection

  • Comparison management

  • Feature importance analysis

  • General analysis operations

  • Visualization and plotting

Examples

Basic pipeline workflow:

>>> pipeline = PipelineManager()
>>>
>>> # Methods expecting pipeline_data get automatic injection
>>> pipeline.trajectory.load_trajectories('../data')
>>> pipeline.feature.compute_features('distances', 'res CA')
>>> pipeline.feature_selector.create('my_selection')
>>> pipeline.feature_selector.add('my_selection', 'distances', 'res ALA')
>>> pipeline.feature_selector.select('my_selection')
>>> pipeline.clustering.cluster('my_features', 'dbscan', eps=0.5)
>>>
>>> # Utility methods work without injection
>>> valid = pipeline.trajectory.validate_selection('res CA')
>>> formats = pipeline.trajectory.get_supported_formats()
>>>
>>> # Advanced: Direct data access
>>> summary = pipeline.data.get_data_summary()
__init__(stride: int = 1, concat: bool = False, selection: str | None = None, use_memmap: bool = True, chunk_size: int = 2000, dtype: type = <class 'numpy.float32'>, cache_dir: str = './cache', max_memory_gb: float = 6.0, show_progress: bool = True, use_stability_config: bool | None = None)

Initialize the pipeline manager with configuration for all managers.

Parameters

strideint, default=1

Default stride for trajectory loading. Larger values reduce memory footprint and downstream compute at the cost of temporal resolution. For large datasets, consider stride > 1 to keep feature matrices smaller and more cache-friendly.

concatbool, default=False

Default concatenation setting for trajectories. When True, multiple trajectories are concatenated into one, which simplifies indexing but may increase memory pressure for very long series.

selectionstr, optional

Default MDTraj selection string for trajectories. Use this to restrict atom selection at load time to reduce downstream features.

use_memmapbool, default=True

Whether to use memory mapping for feature and decomposition data. Enable this for large datasets that do not fit comfortably in RAM. When stability settings are enabled, the pipeline lowers I/O priority during large sequential scans to keep the system responsive.

chunk_sizeint, default=2000

Processing chunk size for feature and decomposition computation. Larger chunks reduce overhead but increase peak memory and I/O burst size. For very large memmaps, a moderate chunk size tends to provide the best I/O behavior.

dtypetype, default=np.float32

Data type for feature matrices (float32 or float64). float32 saves 50% memory and is sufficient for most MD analysis. Use float64 only if you need extreme numerical precision or stable eigenvalues.

cache_dirstr, default=”./cache”

Cache directory path for all managers. For memmap-heavy workloads, use a fast local SSD to avoid I/O contention.

max_memory_gbfloat, default=6.0

Maximum memory in GB for dataset processing. Used for memory-aware sampling in algorithms like DecisionTree. Increase this on large workstations to reduce sampling, or keep it low to stay responsive.

show_progressbool, default=True

Enable or disable progress bars globally (tqdm). When False, all progress bars are suppressed. This also disables resource limit reporting unless you call report_resource_limits explicitly.

use_stability_configbool or None, default=None

Choose how resource limits are applied at initialization. The default (None) enables stability settings only when use_memmap is True; otherwise it preserves OS defaults. When True, the pipeline applies a stability policy immediately: it lowers CPU priority (nice=15), keeps two CPU cores free via affinity, reduces I/O priority to “low”, and caps BLAS/OpenMP thread pools to the active CPU set. When False, no resource limits are applied at startup and the process keeps OS defaults. All settings are stored in pipeline.config.performance; any change there is applied immediately.

Returns

None

Initializes PipelineManager with automatic data injection

report_resource_limits() None

Report applied resource limits to stdout when progress output is enabled.

This method is public to let users re-print the current limits after changing settings at runtime. It is intentionally lightweight and avoids strong formatting guarantees; the output is meant for human inspection rather than machine parsing.

Returns

None

Prints a one-line summary plus any warnings when applicable.

property trajectory: TrajectoryManager

Access trajectory management with automatic PipelineData injection.

Returns

TrajectoryManager

Trajectory manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

set_show_progress(enabled: bool) None

Public helper to toggle progress bars at runtime.

Parameters

enabledbool

Desired progress bar state. True enables bars, False suppresses them.

Returns

None

Updates the progress controller for subsequent operations.

Examples

>>> pipeline = PipelineManager(show_progress=True)
>>> pipeline.set_show_progress(False)
property feature: FeatureManager

Access feature computation with automatic PipelineData injection.

Returns

FeatureManager

Feature manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

property clustering: ClusterManager

Access clustering analysis with automatic PipelineData injection.

Returns

ClusterManager

Cluster manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

property decomposition: DecompositionManager

Access decomposition analysis with automatic PipelineData injection.

Returns

DecompositionManager

Decomposition manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

property feature_selector: FeatureSelectorManager

Access feature selector management with automatic PipelineData injection.

Returns

FeatureSelectorManager

Feature selector manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

property data_selector: DataSelectorManager

Access data selector management with automatic PipelineData injection.

Returns

DataSelectorManager

Data selector manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

property comparison: ComparisonManager

Access comparison management with automatic PipelineData injection.

Returns

ComparisonManager

Comparison manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

property feature_importance: FeatureImportanceManager

Access feature importance analysis with automatic PipelineData injection.

Returns

FeatureImportanceManager

Feature importance manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

property analysis: AnalysisManager

Access analysis operations with automatic PipelineData injection.

Returns

AnalysisManager

Analysis manager with automatic PipelineData injection. All methods that expect pipeline_data parameter will receive it automatically.

property plots: PlotsManager

Access plotting and visualization operations.

Returns

PlotsManager

Plots manager for creating visualizations. Provides three access patterns:

  • Direct: pipeline.plots.landscape(…)

  • Decomposition-focused: pipeline.plots.decomposition.landscape(…)

  • Clustering-focused: pipeline.plots.clustering.landscape(…)

Examples

>>> # Direct landscape plot
>>> pipeline.plots.landscape("pca", [0, 1])
>>> # Decomposition-focused
>>> pipeline.plots.decomposition.landscape("pca", [0, 1])
>>> # Clustering-focused with centers
>>> pipeline.plots.clustering.landscape(
...     "dbscan", "pca", [0, 1], show_centers=True
... )
property structure_visualization: StructureVisualizationManager

Access 3D structure visualization with automatic PipelineData injection.

Returns

StructureVisualizationManager

Structure visualization manager with automatic PipelineData injection.

Examples

>>> # Beta-factor visualization
>>> pipeline.structure_visualization.visualize_importance_beta_factors(
...     "dt_analysis", "cluster_0_vs_rest", n_top=10
... )
property data

Direct access to pipeline data (advanced usage).

Provides direct access to the central PipelineData container for advanced users who need to inspect or manipulate data directly. Normal usage should go through the manager properties.

Returns

PipelineData

Central pipeline data container with all analysis data

summary() Dict[str, Any]

Get summary of all pipeline data.

Returns

dict

Summary information about all loaded and computed data

add_custom_metadata(name: str, value: Any, overwrite: bool = False, warn_if_large: bool = True, max_size_gb: float | None = None) None

Register user-defined custom metadata in the pipeline state.

Parameters

namestr

Metadata key.

valueAny

Metadata payload to persist with the pipeline.

overwritebool, default=False

If False, existing keys raise ValueError.

warn_if_largebool, default=True

Emit RuntimeWarning when the estimated object size exceeds the configured threshold.

max_size_gbfloat, optional

Explicit warning threshold in GB. If None, uses pipeline.data.max_memory_gb.

Returns

None

Stores metadata in-place.

Notes

TODO: Add optional disk-backed backend (e.g. zarr/proxy) for large nested metadata payloads. Current implementation keeps metadata in RAM.

get_custom_metadata(name: str) Any

Retrieve a registered custom metadata payload by key.

Parameters

namestr

Metadata key.

Returns

Any

Stored metadata payload.

clear_all() None

Clear all pipeline data.

Resets the entire pipeline to empty state, clearing all trajectories, features, clustering, and decomposition results.

close() None

Release in-memory resources and memmap handles owned by this pipeline.

This method closes memmap-backed arrays and trajectory runtime caches without deleting cache files on disk. It is safe to call multiple times.

Returns

None

Frees resources and detaches in-memory references.

save_to_single_file(save_path: str) None

Save complete pipeline to single pickle file.

This method saves the entire PipelineData object including all computed features, trajectories, clusterings, decompositions, and metadata to a single file. Memmap files remain in cache directory.

Parameters

save_pathstr

Path where to save the complete pipeline

Returns

None

Saves the complete pipeline to the specified path

Examples

>>> pipeline.save_to_single_file('complete_analysis.pkl')
static load_from_single_file(load_path: str, cache_dir: str | None = './cache', chunk_size: int = 1000, stride: int = 1, concat: bool = False, selection: str | None = None, show_progress: bool = True) PipelineManager

Load complete pipeline from single pickle file.

This static method creates a new PipelineManager instance with specified cache directory and loads pipeline state from file. Memmap files are expected in the cache directory.

Parameters

load_pathstr

Path to the saved pipeline pickle file

cache_dirstr, optional

Fallback cache directory used only when saved cache metadata is unavailable. By default, the cache scope stored in the single-file payload is reused to preserve pipeline identity.

chunk_sizeint, default=1000

Default chunk size for future operations

strideint, default=1

Default stride for future trajectory loading

concatbool, default=False

Default concat mode for future trajectory loading

selectionstr, optional

Default MDTraj selection string for trajectories

show_progressbool, default=True

Enable or disable progress bars globally (tqdm)

Returns

PipelineManager

New PipelineManager instance with loaded pipeline state

Examples

>>> loaded_pipeline = PipelineManager.load_from_single_file('analysis.pkl')
>>> loaded_pipeline.print_info()
>>> # Load with custom cache directory
>>> loaded_pipeline = PipelineManager.load_from_single_file(
...     'analysis.pkl',
...     cache_dir='./my_cache'
... )
>>> # Load with custom trajectory defaults for adding more data
>>> loaded_pipeline = PipelineManager.load_from_single_file(
...     'analysis.pkl',
...     stride=10
... )
>>> # Now load additional trajectories with stride=10
>>> loaded_pipeline.trajectory.load_trajectories('new_data/')
create_sharable_archive(archive_path: str, compression: str = 'zst', exclude_visualizations: bool = True, include_structure_files: bool = True, compression_level: int | None = None, zstd_threads: int | None = None, zstd_reserve_cores: int = 2, sha: bool | str = True, overwrite: bool = False) str

Create sharable compressed archive with pipeline and essential data.

Creates compressed tar archive containing pipeline pickle file and all necessary memmap files from cache directory. Excludes visualization outputs by default for smaller archive size.

Parameters

archive_pathstr

Path for output archive (extension added automatically)

compressionstr, default=”zst”

Compression method: “zst”, “bz2”, or “gz”

exclude_visualizationsbool, default=True

If True, exclude PNG/PDF/SVG plot outputs

include_structure_filesbool, default=True

If True, include PDB/PML structure files

compression_levelint, optional

Compression level override (e.g. zst level 1-19).

zstd_threadsint, optional

Thread count for zstd compression. If None, chosen automatically.

zstd_reserve_coresint, default=2

Number of CPU cores to keep free when zstd thread count is automatic.

shabool or str, default=True

If True, write <archive>.sha next to the created archive. When a string is provided, it is used as the explicit SHA256 output path.

overwritebool, default=False

If True, replace existing archive outputs. When False, existing archive or SHA256 outputs raise FileExistsError.

Returns

str

Path to created archive file

Examples

>>> # Minimal archive (only data)
>>> pipeline.create_sharable_archive("analysis.tar.zst")
>>> # Full archive (with visualizations)
>>> pipeline.create_sharable_archive(
...     "analysis_full.tar.zst",
...     exclude_visualizations=False
... )
>>> # Data only (no structure files)
>>> pipeline.create_sharable_archive(
...     "analysis_data.tar.zst",
...     include_structure_files=False
... )
>>> pipeline.create_sharable_archive(
...     "analysis.tar.zst",
...     sha="checksums/analysis.sha"
... )

Notes

  • zstd compression is optimized for fast runtime and low memory pressure

  • zstd compression is multithreaded by default

  • With use_memmap=False: Archive contains only pipeline.pkl + optional PDB/PML

  • With use_memmap=True: Archive contains pipeline.pkl + .dat files + zarr directories

  • Paths are preserved relative to cache directory

  • Memmap and zarr only included when use_memmap=True

static load_from_archive(file_path: str, cache_dir: str = './cache', verify: bool = True, sha: str | None = None, download_url: str | None = None, overwrite: bool = False, chunk_size: int = 1000, stride: int = 1, concat: bool = False, selection: str | None = None, show_progress: bool = True) PipelineManager

Load pipeline from sharable archive.

Extracts compressed archive, moves cache files to specified cache directory, and loads pipeline state. Automatically repairs memmap file paths to point to new cache location.

Parameters

file_pathstr

Local archive path, local download target, or remote archive URL.

cache_dirstr, default=”./cache”

Target cache directory for extracted files

verifybool, default=True

Whether to validate the archive via SHA256 before loading. When sha is provided, SHA256 verification is performed even if verify is False.

shastr, optional

SHA256 input used for archive verification. May be provided as a raw SHA256 hex string, a local path to a .sha file, or a URL. When verification is enabled and sha is missing, loading fails.

download_urlstr, optional

Remote source URL used when file_path should act as the local archive target. When omitted and file_path is itself a URL, downloads are stored under <cache_dir>/downloads/.

overwritebool, default=False

For remote URLs, controls whether an existing local target file is replaced. When False, an existing downloaded file is reused and a warning is emitted.

chunk_sizeint, default=1000

Default chunk size for future operations

strideint, default=1

Default stride for future trajectory loading

concatbool, default=False

Default concatenation setting for future trajectory loading

selectionstr, optional

Default MDTraj selection string for trajectories

show_progressbool, default=True

Enable or disable progress bars globally (tqdm)

Returns

PipelineManager

Loaded pipeline instance

Examples

>>> # Load a remote archive with explicit verification disabled
>>> pipeline = PipelineManager.load_from_archive(
...     "local_analysis.tar.zst",
...     cache_dir="./my_cache",
...     download_url="https://example.org/analysis.tar.zst",
...     verify=False
... )
>>> # Load with SHA256 verification from a sidecar file
>>> pipeline = PipelineManager.load_from_archive(
...     "analysis.tar.zst",
...     verify=True,
...     sha="analysis.tar.zst.sha"
... )

Notes

  • Extracts to temporary directory

  • Moves cache files to specified cache_dir

  • Automatically repairs memmap paths for portability

  • Cache directory created if it doesn’t exist

print_info() None

Print comprehensive pipeline information.

This method prints information from ALL managers to provide a complete overview of the pipeline state.

Returns

None

Prints comprehensive pipeline information to console

Examples

>>> pipeline.print_info()
======= PIPELINE INFORMATION =======

— Trajectory Data — Loaded 3 trajectories:

[0] system1_traj1: 1000 frames
[1] system1_traj2: 1500 frames
[2] system2_traj1: 800 frames

— Feature Data — Feature Types: 2 (distances, contacts)

— Clustering Data — Clustering Names: 1 (conformations)

(… information from all managers …)

update_config(chunk_size: int = None, cache_dir: str = None, use_memmap: bool = None)

Update pipeline configuration parameters at runtime.

Allows modification of key configuration parameters after pipeline initialization. Changes are propagated to all managers and the central PipelineData container.

Parameters

chunk_sizeint, optional

New chunk size for processing operations. Must be positive integer.

cache_dirstr, optional

New cache directory path. Directory will be created if it doesn’t exist.

use_memmapbool, optional

Whether to use memory mapping for data storage operations.

Returns

None

Updates configuration in all components

Raises

ValueError

If chunk_size is not a positive integer

OSError

If cache_dir cannot be created

Examples

Update chunk size for better memory management:

>>> pipeline.update_config(chunk_size=5000)

Change cache directory and enable memory mapping:

>>> pipeline.update_config(cache_dir="/tmp/mdx_cache", use_memmap=True)
get_config() dict

Get current pipeline configuration parameters.

Returns the current configuration settings that are active across all pipeline components.

Returns

dict

Dictionary containing current configuration values

Examples

Check current configuration:

>>> pipeline = PipelineManager(chunk_size=1000, use_memmap=True)
>>> config = pipeline.get_config()
>>> print(f"Using chunk size: {config['chunk_size']}")
>>> print(f"Memory mapping: {config['use_memmap']}")