Trajectory Manager
GitHub Link to Code.
Trajectory management module for loading and manipulating MD trajectory data.
- class mdxplain.trajectory.manager.trajectory_manager.TrajectoryManager(stride: int = 1, concat: bool = False, selection: str | None = None, cache_dir: str = './cache', use_memmap: bool = False, chunk_size: int = 1000)
Manager for pure trajectory data objects without feature dependencies.
Provides methods to load, add, remove, slice, and select atoms in MD trajectories. This manager operates on TrajectoryData objects and does not depend on any feature data. It is designed to be used both standalone and within a pipeline context.
It handles various trajectory formats, automatic format detection, and provides a consistent interface for working with trajectory data.
It can load multiple trajectories, apply selections, and manage memory-efficient representations using DaskMDTrajectory.
It can load trajectories from directories and nested directories or lists of files, handle topology files, and apply MDTraj selection strings.
- __init__(stride: int = 1, concat: bool = False, selection: str | None = None, cache_dir: str = './cache', use_memmap: bool = False, chunk_size: int = 1000)
Initialize trajectory manager.
Parameters
- strideint, default=1
Load every stride-th frame from trajectories. Use values > 1 to reduce memory usage and computation time by subsampling frames.
- concatbool, default=False
Whether to concatenate multiple trajectories per system into single trajectory objects. Useful when dealing with trajectory splits.
- selectionstr, optional
MDTraj selection string to apply to all loaded trajectories. See: https://mdtraj.org/1.9.4/atom_selection.html
- use_memmapbool, default=False
Whether to use memory-mapped DaskMDTrajectory for large files. When True, trajectories are loaded as DaskMDTrajectory objects for efficient memory usage.
- chunk_sizeint, default=1000
Chunk size for DaskMDTrajectory (only used when use_memmap=True). Number of frames per chunk for memory management.
- cache_dirstr, default=”./cache”
Directory for caching intermediate results and Zarr files.
Returns
- None
Initializes TrajectoryManager instance with default parameters
Examples
>>> traj_data = TrajectoryData() >>> traj_manager = TrajectoryManager() >>> traj_manager.load_trajectories(traj_data, '../data')
- load_trajectories(pipeline_data: PipelineData, data_input: str | List[Any], concat: bool | None = None, stride: int | None = 1, selection: str | None = None, force: bool = False) None
Load molecular dynamics trajectories from files or directories into PipelineData.
This method handles loading of MD trajectories in various formats (e.g., .xtc, .dcd, .trr) along with their topology files. The loading is performed using the TrajectoryLoadHelper class which supports automatic format detection and multiple trajectory handling.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.load_trajectories('../data') # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.load_trajectories(pipeline_data, '../data') # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container where trajectories will be stored
- data_inputstr or list
Path to directory containing trajectory files, or list of trajectory file paths. When a directory is provided, all supported trajectory files in that directory will be loaded.
- concatbool, optional
Whether to concatenate multiple trajectories per system into single trajectory objects. If None, uses manager default.
- strideint, optional
Load every stride-th frame from trajectories. If None, uses manager default.
- selectionstr, optional
MDTraj selection string to apply to all loaded trajectories. If None, uses manager default.
- forcebool, default=False
Whether to force loading even when features have been calculated. When True, existing features become invalid and should be recalculated.
Returns
- None
Loads trajectories into pipeline_data.trajectory_data and sets up topology/names
Examples
Pipeline mode (automatic injection): >>> pipeline = PipelineManager() >>> pipeline.trajectory.load_trajectories(‘../data’)
Standalone mode: >>> pipeline_data = PipelineData() >>> traj_manager = TrajectoryManager() >>> traj_manager.load_trajectories(pipeline_data, ‘../data’)
>>> # Load with tags >>> traj_manager.load_trajectories( ... pipeline_data, '../data', tags_file='tags.json' ... )
Notes
Supported formats depend on MDTraj capabilities
Topology files (.pdb, .gro, .psf) should be in the same directory
Large trajectories benefit from striding to reduce memory usage
Selection is applied to all trajectories after loading
- add_trajectory(pipeline_data: PipelineData, data_input: str | List[Any], concat: bool | None = None, stride: int | None = 1, selection: str | None = None) None
Add molecular dynamics trajectories to TrajectoryData object.
This method works like load_trajectories but appends new trajectories instead of replacing existing ones. Useful for loading additional trajectory data without losing previously loaded trajectories.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.add_trajectory('../data2') # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.add_trajectory(pipeline_data, '../data2') # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
- data_inputstr or list
Path to directory containing trajectory files, or list of trajectory file paths. When a directory is provided, all supported trajectory files in that directory will be loaded.
- concatbool, optional
Whether to concatenate multiple trajectories per system into single trajectory objects. If None, uses manager default.
- strideint, optional
Load every stride-th frame from trajectories. If None, uses manager default.
- selectionstr, optional
MDTraj selection string to apply to all newly loaded trajectories. If None, uses manager default.
Returns
- None
Appends new trajectories to existing trajectory list in pipeline_data
Examples
>>> traj_data = TrajectoryData() >>> traj_manager = TrajectoryManager() >>> traj_manager.load_trajectories(traj_data, '../data') >>> traj_manager.add_trajectory(traj_data, '../data2')
Raises
- ValueError
If no trajectories are currently loaded
Notes
New trajectories are appended to existing ones
Trajectory names are also appended to maintain consistency
Selection is only applied to newly loaded trajectories
Existing trajectories remain unchanged
- remove_trajectory(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, force: bool = False) None
Remove specified trajectories from the loaded trajectory list.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.remove_trajectory([0, 1]) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.remove_trajectory(pipeline_data, [0, 1]) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
- traj_selectionint, str, list, or “all”
Trajectory selection (required). Options:
int: single trajectory by index (e.g., 0)
str: trajectory name or “all” for all trajectories
list: multiple indices/names (can be mixed)
“all”: all loaded trajectories
- forcebool, default=False
Whether to force removal even when features have been calculated. When True, existing features become invalid and should be recalculated.
Returns
- None
Removes trajectories from pipeline_data
Examples
>>> pipeline_data = PipelineData() >>> traj_manager = TrajectoryManager() >>> traj_manager.load_trajectories(pipeline_data, '../data') >>> traj_manager.remove_trajectory(pipeline_data, [0, 1])
Raises
- ValueError
If trajectories are not loaded, if trajs contains invalid indices/names
- slice_traj(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, frames: int | slice | List[int] | None = None, stride: int | None = 1, cut: int | None = None, data_selector: str | None = None, force: bool = False, inplace: bool = True) None
Slice trajectories using frame ranges, stride, OR DataSelector.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.slice_traj(traj_selection="all", frames=1000) # NO pipeline_data parameter >>> pipeline.trajectory.slice_traj(traj_selection="all", data_selector="folded_frames") # Use DataSelector
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.slice_traj(pipeline_data, frames=1000, traj_selection="all") # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
- traj_selectionint, str, list, or “all”
Selection of trajectories to process (required):
int: trajectory index
str: trajectory name or “all” for all trajectories
list: list of indices/names
“all”: all loaded trajectories
- framesint, slice, list, optional
Frame specification for slicing:
int: include frames 0 to frames (e.g., frames=1000 → frames 0-999)
slice: direct slice object (e.g., slice(100, 500) → frames 100-499)
list: specific frame indices (e.g., [0, 10, 20, 30])
Ignored if data_selector is provided.
- strideint, optional
Take every stride-th frame. Use values > 1 to subsample frames. If None, uses manager default.
- cutint, optional
Frame number after which to cut trajectories. Frames after this index will be removed. Applied after frame selection and stride.
- data_selectorstr, optional
Name of DataSelector to use for frame selection. If provided, overrides frames/stride parameters and uses the selected frames from the DataSelector.
- forcebool, default=False
Whether to force slicing even when features have been calculated. When True, existing features become invalid and should be recalculated.
- inplacebool, default=True
Whether to perform the operation in-place. For DaskMDTrajectory, slicing returns a lazy view which is updated in the pipeline.
Returns
- None
Modifies trajectories and updates pipeline_data.
Examples
>>> pipeline_data = PipelineData() >>> traj_manager = TrajectoryManager() >>> traj_manager.load_trajectories(pipeline_data, '../data') >>> traj_manager.slice_traj(pipeline_data, traj_selection="all", frames=1000)
Raises
- ValueError
If trajectories are not loaded or if selection contains invalid indices/names or if DataSelector does not exist
- select_atoms(pipeline_data: PipelineData, selection: str, traj_selection: int | str | List[int | str] | 'all', force: bool = False, inplace: bool = True) None
Apply atom selection to trajectories using MDTraj selection syntax.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.select_atoms("protein", "all") # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.select_atoms(pipeline_data, "protein", "all") # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
- traj_selectionint, str, list, or “all”
Selection of trajectories to process:
int: trajectory index
str: trajectory name or “all” for all trajectories
list: list of indices/names
- selectionstr
MDTraj selection string (e.g., “protein”, “backbone”, “resid 10 to 50”) See: https://mdtraj.org/1.9.4/atom_selection.html
- forcebool, default=False
Whether to force atom selection even when features have been calculated. When True, existing features become invalid and should be recalculated.
- inplacebool, default=True
Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.
Returns
- None
Applies atom selection to trajectories and updates pipeline_data
Examples
>>> pipeline_data = PipelineData() >>> traj_manager = TrajectoryManager() >>> traj_manager.load_trajectories(pipeline_data, '../data') >>> traj_manager.select_atoms(pipeline_data, "protein", "all")
>>> # Select atoms from specific trajectories >>> traj_manager.select_atoms(pipeline_data, "protein", [0, 1, 2])
Raises
- ValueError
If trajectories are not loaded or if selection/trajs contain invalid values
- add_labels(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | 'all', fragment_definition: str | Dict[str, Tuple[int, int]] | None = None, fragment_type: str | Dict[str, str] | None = None, fragment_molecule_name: str | Dict[str, str] | None = None, consensus: bool = False, aa_short: bool = False, try_web_lookup: bool = True, force: bool = False, **nomenclature_kwargs) None
Add nomenclature labels to selected trajectories.
This method creates consensus labels for MD trajectories using different mdciao nomenclature systems (GPCR, CGN, KLIFS). Different systems can have different nomenclatures by applying labels to specific trajectory selections.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.add_labels([0, 1], fragment_definition="receptor") # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.add_labels(pipeline_data, [0, 1], fragment_definition="receptor") # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
- traj_selectionint, str, list, or “all”
Selection of trajectories to add labels to:
int: trajectory index
str: trajectory name or “all” for all trajectories
list: list of indices/names
“all”: all loaded trajectories
- fragment_definitionstr or dict, default None
If string, uses that as fragment name for entire topology. If dict, maps fragment names to residue ranges: {“cgn_a”: (0, 348), “beta2”: (400, 684)} Only required when consensus=True.
- fragment_typestr or dict, default None
If string, uses that nomenclature type for all fragments. If dict, maps fragment names to nomenclature types: {“cgn_a”: “cgn”, “beta2”: “gpcr”}. Use mdciao nomenclature types. Allowed types: gpcr, cgn, klifs Only required when consensus=True.
- fragment_molecule_namestr or dict, default None
If string, uses that molecule name for all fragments. If dict, maps fragment names to molecule names: {“cgn_a”: “gnas2_bovin”, “beta2”: “adrb2_human”}. Use the UniProt entry name (not accession ID) for GPCR/CGN labelers, or KLIFS string for KLIFS labelers. See https://www.uniprot.org/help/difference_accession_entryname # noqa: E501 for UniProt naming conventions. See https://proteinformatics.uni-leipzig.de/mdciao/api/generated/generated/mdciao.nomenclature.LabelerKLIFS.html#mdciao.nomenclature.LabelerKLIFS # noqa: E501 for KLIFS naming conventions. Only required when consensus=True.
- consensusbool, default False
Whether to use consensus labeling (combines AA codes with nomenclature labels). If False, only returns amino acid labels without nomenclature.
- aa_shortbool, default True
Whether to use short amino acid names (T vs THR)
- verbosebool, default False
Whether to enable verbose output from labelers
- try_web_lookupbool, default True
Whether to try web lookup for molecule data
- forcebool, default False
Whether to force label addition even when labels already exist. When True, existing labels will be overwritten. When False, raises ValueError if labels exist.
- write_to_diskbool, default False
Whether to write cache files to disk
- cache_folderstr, default “./cache”
Folder for cache files
- nomenclature_kwargs
Additional keyword arguments passed to the mdciao labelers
Returns
None
Raises
- ValueError
If trajectories are not loaded
- ValueError
If traj_selection contains invalid indices or names
- ValueError
If nomenclature labels already exist for selected trajectories and force=False
- ValueError
If fragment_definition is required when consensus=True
- ValueError
If fragment_type is required when consensus=True
- ValueError
If fragment_molecule_name is required when consensus=True
- ValueError
If fragment_definition is not a string or dictionary
- ValueError
If fragment_type is not a string or dictionary
- ValueError
If fragment_molecule_name is not a string or dictionary
Notes
This method wraps mdciao consensus nomenclature systems: https://proteinformatics.uni-leipzig.de/mdciao/api/generated/mdciao.nomenclature.html
Supported fragment types:
Examples
>>> pipeline_data = PipelineData() >>> traj_manager = TrajectoryManager() >>> traj_manager.load_trajectories(pipeline_data, '../data')
>>> # Add labels to specific trajectories (different systems) >>> traj_manager.add_labels( ... pipeline_data, [0, 1], fragment_definition="receptor", fragment_type="gpcr", ... fragment_molecule_name="adrb2_human", consensus=True ... ) >>> traj_manager.add_labels( ... pipeline_data, [2, 3], fragment_definition="kinase", fragment_type="klifs", ... fragment_molecule_name="abl1_human", consensus=True ... )
>>> # Add labels to all trajectories >>> traj_manager.add_labels(pipeline_data, "all", fragment_definition="protein")
>>> # Adding labels again requires force=True >>> traj_manager.add_labels(pipeline_data, "all", force=True) # Overwrites existing labels
- add_tags(pipeline_data: PipelineData, trajectory_selector: int | str | List[Any] | range | Dict[Any, List[str]], tags: List[str] | None = None) None
Add tags to trajectories using flexible selectors.
This method supports single trajectories, multiple trajectories, pattern matching, and bulk assignment using dictionaries. It provides a powerful interface for managing trajectory tags in complex scenarios.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.add_tags(0, ["system_A"]) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.add_tags(pipeline_data, 0, ["system_A"]) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
- trajectory_selectorint, str, list, range, dict
Flexible selector for trajectories:
int: single trajectory by index (e.g., 0)
str: single trajectory by name or pattern (e.g., “traj1”, “system_*”)
Supports multiple string formats:
Range: “0-3”, “id 0-3” → [0, 1, 2, 3]
Comma list: “1,2,4,5”, “id 1,2,4,5” → [1, 2, 4, 5]
Single number: “7”, “id 7” → [7]
Pattern: “system_*” → fnmatch pattern matching
list: multiple selectors (e.g., [0, 1, “special_traj”, range(5,8)])
range: range of indices (e.g., range(0, 4))
dict: bulk assignment {selector: tags, …}
- tagslist, optional
List of tag strings to add. Required when trajectory_selector is not dict. Ignored when trajectory_selector is dict.
Returns
- None
Adds tags to selected trajectories and rebuilds frame mapping
Examples
>>> # Single trajectory >>> traj_manager.add_tags(pipeline_data, 0, ["system_A", "biased"]) >>> traj_manager.add_tags(pipeline_data, "traj1", ["system_B"])
>>> # Multiple trajectories >>> traj_manager.add_tags(pipeline_data, [0, 1, 2], ["control"]) >>> traj_manager.add_tags(pipeline_data, range(0, 4), ["batch_1"])
>>> # Pattern matching >>> traj_manager.add_tags(pipeline_data, "system_2_*", ["system_B"])
>>> # Complex nested selectors >>> traj_manager.add_tags(pipeline_data, [range(0,3), "system_2_*"], ["mixed"])
>>> # Bulk assignment with dict >>> traj_manager.add_tags(pipeline_data, { ... [range(0,4)]: ["system_A", "unbiased"], ... [range(4,8), "special_traj"]: ["control"], ... "system_2_*": ["system_B", "production"] ... })
Raises
- ValueError
If tags is None when trajectory_selector is not dict
- ValueError
If trajectory selector contains invalid indices or names
- set_tags(pipeline_data: PipelineData, trajectory_selector: int | str | List[Any] | range | Dict[Any, List[str]], tags: List[str] | None = None) None
Set (replace) tags for trajectories using flexible selectors.
This method completely replaces existing tags instead of merging them. It supports the same flexible selector system as add_tags() but provides replacement semantics for tag management scenarios where you need to reset or completely change trajectory tags.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.set_tags(0, ["system_A"]) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.set_tags(pipeline_data, 0, ["system_A"]) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
- trajectory_selectorint, str, list, range, dict
Flexible selector for trajectories:
int: single trajectory by index (e.g., 0)
str: single trajectory by name or pattern (e.g., “traj1”, “system_*”)
Supports multiple string formats:
Range: “0-3”, “id 0-3” → [0, 1, 2, 3]
Comma list: “1,2,4,5”, “id 1,2,4,5” → [1, 2, 4, 5]
Single number: “7”, “id 7” → [7]
Pattern: “system_*” → fnmatch pattern matching
list: multiple selectors (e.g., [0, 1, “special_traj”, range(5,8)])
range: range of indices (e.g., range(0, 4))
dict: bulk assignment {selector: tags, …}
- tagslist, optional
List of tag strings to set. Required when trajectory_selector is not dict. Ignored when trajectory_selector is dict.
Returns
- None
Sets tags for selected trajectories and rebuilds frame mapping
Examples
>>> # Replace tags for single trajectory >>> traj_manager.add_tags(pipeline_data, 0, ["old_tag", "other"]) >>> traj_manager.set_tags(pipeline_data, 0, ["new_tag"]) # Replaces both old tags
>>> # Reset multiple trajectories to same tags >>> traj_manager.set_tags(pipeline_data, [0, 1, 2], ["reset", "control"])
>>> # Clear all tags (set to empty) >>> traj_manager.set_tags(pipeline_data, "all", [])
>>> # Bulk replacement with dict >>> traj_manager.set_tags(pipeline_data, { ... [0, 1]: ["system_A", "production"], ... [2, 3]: ["system_B", "test"], ... "control_*": ["control"] ... })
Raises
- ValueError
If tags is None when trajectory_selector is not dict
- ValueError
If trajectory selector contains invalid indices or names
- rename_trajectories(pipeline_data: PipelineData, name_mapping: Dict[int | str, str] | List[str]) None
Rename trajectory names.
This method allows renaming trajectory names for better organization and more descriptive identification. Supports both dictionary-based mapping and positional list assignment.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.rename_trajectories({0: "new_name"}) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.rename_trajectories(pipeline_data, {0: "new_name"}) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
- name_mappingdict or list
Mapping for trajectory names:
dict: {old_name_or_index: new_name, …} for selective renaming
list: [new_name1, new_name2, …] for positional assignment
Returns
- None
Renames trajectories and rebuilds frame tag mapping
Examples
>>> # Dictionary-based renaming >>> traj_manager.rename_trajectories(pipeline_data, { ... 0: "system_A_replicate_1", ... "old_traj_name": "system_B_replicate_1", ... 2: "control_experiment" ... })
>>> # Positional list assignment >>> traj_manager.rename_trajectories(pipeline_data, [ ... "system_A_rep1", ... "system_A_rep2", ... "system_B_rep1", ... "control" ... ])
Raises
- ValueError
If no trajectories are loaded, mapping is invalid, or references invalid trajectories
- reset_trajectory_data(pipeline_data: PipelineData) None
Reset the trajectory data object to empty state.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.reset_trajectory_data() # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.reset_trajectory_data(pipeline_data) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data object
Returns
- None
Resets trajectory data to empty state
Examples
>>> traj_manager.reset_trajectory_data(pipeline_data)
- save(pipeline_data: PipelineData, save_path: str) None
Save trajectory data to disk.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.save('trajectory_backup.pkl') # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.save(pipeline_data, 'trajectory_backup.pkl') # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
- save_pathstr
Path where to save the trajectory data
Returns
- None
Saves the trajectory data to the specified path
Examples
>>> trajectory_manager.save(pipeline_data, 'trajectory_backup.pkl')
- load(pipeline_data: PipelineData, load_path: str) None
Load trajectory data from disk.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.load('trajectory_backup.pkl') # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.load(pipeline_data, 'trajectory_backup.pkl') # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container to load trajectory data into
- load_pathstr
Path to the saved trajectory data file
Returns
- None
Loads the trajectory data from the specified path
Examples
>>> trajectory_manager.load(pipeline_data, 'trajectory_backup.pkl')
- print_info(pipeline_data: PipelineData) None
Print trajectory information.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.print_info() # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.print_info(pipeline_data) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
Returns
- None
Prints trajectory information to console
Examples
>>> trajectory_manager.print_info(pipeline_data) === TrajectoryData === Loaded 3 trajectories: [0] system1_prot_traj1: 1000 frames, tags: ['system_A', 'biased'] [1] system1_prot_traj2: 1500 frames, tags: ['system_A', 'unbiased'] [2] system2_prot_traj1: 800 frames, tags: ['system_B', 'biased']
- select_trajs(pipeline_data: PipelineData, data_selector: str) List[DaskMDTrajectory | md.Trajectory]
Create new trajectory objects from DataSelector frames.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> selected = pipeline.trajectory.select_trajs("folded_frames") # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> selected = manager.select_trajs(pipeline_data, "folded_frames") # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data with trajectories and DataSelector
- data_selectorstr
Name of DataSelector to use
Returns
- List[Union[DaskMDTrajectory, md.Trajectory]]
List of new trajectory objects with selected frames
Examples
>>> # Create new trajectories from DataSelector >>> selected = pipeline.trajectory.select_trajs("folded_frames") >>> print(f"Created {len(selected)} new trajectories") >>> >>> # Use the returned trajectories for analysis >>> for traj in selected: ... print(f"Trajectory has {traj.n_frames} frames")
- superpose(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, reference_traj: int = 0, reference_frame: int = 0, atom_selection: str = 'backbone', inplace: bool = True) None
Superpose selected trajectories to a reference frame.
This method aligns all frames of selected trajectories to a specific reference frame using MDTraj’s superpose functionality. By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.superpose(traj_selection="all", reference_traj=0, reference_frame=0) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.superpose(pipeline_data, traj_selection="all", reference_traj=0, reference_frame=0) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
- traj_selectionint, str, list, or “all”
Selection of trajectories to align (required):
int: single trajectory by index
str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)
list: multiple indices/names/tags
“all”: all loaded trajectories
- reference_trajint, default=0
Index of trajectory containing the reference frame
- reference_frameint, default=0
Frame index within reference trajectory to use as alignment reference
- atom_selectionstr, default=”backbone”
MDTraj selection string for atoms to use in alignment calculation. Common selections: “backbone”, “name CA”, “protein”, “resid 10 to 50” See: https://mdtraj.org/1.9.4/atom_selection.html
- inplacebool, default=True
Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.
Returns
- None
Modifies trajectories and updates pipeline_data.
Examples
Basic alignment: >>> pipeline.trajectory.superpose(traj_selection=”all”) # Align all to first frame of first trajectory
Specific reference: >>> pipeline.trajectory.superpose( … traj_selection=”all”, … reference_traj=2, … reference_frame=100, … atom_selection=”name CA” … )
Notes
Dask trajectories (use_memmap=True) handle memory management automatically
All trajectories must have compatible topology for alignment
Large trajectories may take significant time to align
Raises
- ValueError
If no trajectories are loaded
- ValueError
If reference_traj index is invalid
- ValueError
If reference_frame index is invalid for reference trajectory
- ValueError
If traj_selection contains invalid indices/names
- ValueError
If atom_selection produces no atoms or incompatible atom counts
- center_coordinates(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, mass_weighted: bool = False, force: bool = False, inplace: bool = True) None
Center trajectory coordinates at the origin.
This method centers all frames of selected trajectories at the origin using either geometric centering (default) or mass-weighted centering. By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.center_coordinates(traj_selection="all") # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.center_coordinates(pipeline_data, traj_selection="all") # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
- traj_selectionint, str, list, or “all”
Selection of trajectories to center (required):
int: single trajectory by index
str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)
list: multiple indices/names/tags
“all”: all loaded trajectories
- mass_weightedbool, default=False
Use mass-weighted centering instead of geometric centering. When True, the center of mass is used; when False, the geometric center (centroid) is used.
- forcebool, default=False
Whether to force centering even when features have been calculated. When True, existing features become invalid and should be recalculated.
- inplacebool, default=True
Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.
Returns
- None
Modifies trajectories and updates pipeline_data.
Examples
Basic geometric centering: >>> pipeline.trajectory.center_coordinates(traj_selection=”all”)
Mass-weighted centering: >>> pipeline.trajectory.center_coordinates( … traj_selection=”all”, … mass_weighted=True … )
Notes
Dask trajectories (use_memmap=True) handle memory management automatically
Centering is useful before RMSD calculations or structural analysis
Mass-weighted centering is more physically meaningful for biomolecules
This operation modifies coordinates but preserves topology
Raises
- ValueError
If no trajectories are loaded
- ValueError
If traj_selection contains invalid indices/names
- smooth(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, width: int, order: int | None = None, atom_selection: str | None = None, force: bool = False, inplace: bool = True) None
Apply Savitzky-Golay smoothing filter to trajectory coordinates.
This method smooths trajectory coordinates using a Savitzky-Golay filter. By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory. Smoothing can be applied to all atoms or a subset selected via atom_selection.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.smooth(traj_selection="all", width=5) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.smooth(pipeline_data, traj_selection="all", width=5) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
- traj_selectionint, str, list, or “all”
Selection of trajectories to smooth (required):
int: single trajectory by index
str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)
list: multiple indices/names/tags
“all”: all loaded trajectories
- widthint
Smoothing window width (must be odd). Larger values produce smoother trajectories but may lose important structural details.
- orderint, optional
Polynomial order for Savitzky-Golay filter. If None, uses default from MDTraj implementation. Typical values: 2-4.
- atom_selectionstr, optional
MDTraj selection string for atoms to smooth. If None, smooths all atoms. Common selections: “backbone”, “name CA”, “protein”, “resid 10 to 50” See: https://mdtraj.org/1.9.4/atom_selection.html
- forcebool, default=False
Whether to force smoothing even when features have been calculated. When True, existing features become invalid and should be recalculated.
- inplacebool, default=True
Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.
Returns
- None
Modifies trajectories and updates pipeline_data.
Examples
Basic smoothing of all atoms: >>> pipeline.trajectory.smooth(traj_selection=”all”, width=5)
Smooth with specific polynomial order: >>> pipeline.trajectory.smooth( … traj_selection=”all”, … width=7, … order=3 … )
Notes
Dask trajectories (use_memmap=True) handle memory management automatically
Smoothing reduces high-frequency noise but may obscure fast dynamics
Window width should be odd; even values will be adjusted internally
Larger windows create smoother trajectories but lose temporal resolution
This operation modifies coordinates but preserves topology
Raises
- ValueError
If no trajectories are loaded
- ValueError
If traj_selection contains invalid indices/names
- ValueError
If width is not positive
- ValueError
If atom_selection is invalid or produces no atoms
- image_molecules(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, anchor_molecules: np.ndarray | None = None, other_molecules: np.ndarray | None = None, make_whole: bool = True, force: bool = False, inplace: bool = True) None
Apply periodic boundary condition imaging to molecules.
This method recenters molecules and wraps them into the primary unit cell using MDTraj’s image_molecules method. By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.image_molecules(traj_selection="all") # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.image_molecules(pipeline_data, traj_selection="all") # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
- traj_selectionint, str, list, or “all”
Selection of trajectories to image (required):
int: single trajectory by index
str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)
list: multiple indices/names/tags
“all”: all loaded trajectories
- anchor_moleculesnp.ndarray, optional
Indices of molecules to anchor at the origin. If None, uses all molecules. Molecules are typically defined by bonded groups in the topology.
- other_moleculesnp.ndarray, optional
Indices of other molecules to image relative to anchors. If None, uses all molecules not in anchor_molecules.
- make_wholebool, default=True
Make molecules whole across periodic boundary conditions before imaging. This ensures molecules are not split across the box boundary.
- forcebool, default=False
Whether to force imaging even when features have been calculated. When True, existing features become invalid and should be recalculated.
- inplacebool, default=True
Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.
Returns
- None
Modifies trajectories and updates pipeline_data.
Examples
Basic imaging (default parameters): >>> pipeline.trajectory.image_molecules(traj_selection=”all”)
Image with specific anchor molecules: >>> # Anchor protein (molecules 0-2), image solvent around it >>> protein_molecules = np.array([0, 1, 2]) >>> pipeline.trajectory.image_molecules( … traj_selection=[0, 1], … anchor_molecules=protein_molecules, … make_whole=True … )
Notes
Dask trajectories (use_memmap=True) handle memory management automatically
Requires trajectory to have periodic boundary condition information
Essential for correct visualization of periodic systems
Does not change topology or number of atoms
Make molecules whole first to prevent artifacts
Raises
- ValueError
If no trajectories are loaded
- ValueError
If traj_selection contains invalid indices/names
- ValueError
If trajectory lacks unit cell information
- remove_solvent(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, exclude: list | None = None, force: bool = False, inplace: bool = True) None
Remove solvent atoms from trajectories.
This method removes solvent atoms using MDTraj’s remove_solvent method, which identifies and removes common solvent molecules (water, ions, etc.). By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory. The operation modifies both coordinates AND topology, changing the number of atoms in the trajectory. Labels are automatically adjusted to match the new residue structure.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.remove_solvent(traj_selection="all") # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.remove_solvent(pipeline_data, traj_selection="all") # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
- traj_selectionint, str, list, or “all”
Selection of trajectories to process (required):
int: single trajectory by index
str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)
list: multiple indices/names/tags
“all”: all loaded trajectories
- excludelist, optional
List of solvent residue names to KEEP (not remove). Common values include [‘HOH’, ‘WAT’] to keep water molecules while removing other solvents. If None, removes all recognized solvent molecules.
- forcebool, default=False
Whether to force removal even when features have been calculated. When True, existing features become invalid and should be recalculated.
- inplacebool, default=True
Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.
Returns
- None
Modifies trajectories and updates pipeline_data.
Examples
Remove all solvent: >>> pipeline.trajectory.remove_solvent(traj_selection=”all”)
Keep water, remove other solvent: >>> pipeline.trajectory.remove_solvent( … traj_selection=”all”, … exclude=[‘HOH’, ‘WAT’] … )
Notes
Dask trajectories (use_memmap=True) handle memory management automatically
This operation CHANGES the number of atoms and topology
Labels are automatically filtered to match new residue structure
Features must be recalculated after this operation
Common solvent names: HOH, WAT, SOL, Na+, Cl-, etc.
Raises
- ValueError
If no trajectories are loaded
- ValueError
If traj_selection contains invalid indices/names
- join(pipeline_data: PipelineData, target_traj: int | str, source_traj: int | str, check_topology: bool = True, remove_source: bool = True, new_name: str | None = None, force: bool = False) None
Join two trajectories along the frame axis (in-place on target).
This method concatenates frames from source_traj to target_traj, creating a single trajectory with combined frames. The target trajectory is modified in-place to contain all frames. Optionally, the source trajectory can be removed after joining, and the target can be renamed.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.join(target_traj=0, source_traj=1) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.join(pipeline_data, target_traj=0, source_traj=1) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
- target_trajint or str
Target trajectory (receives joined frames). Can be index or name.
- source_trajint or str
Source trajectory (provides frames to join). Can be index or name.
- check_topologybool, default=True
Whether to check topology compatibility between trajectories. When True, raises error if atom counts differ.
- remove_sourcebool, default=True
Whether to remove source trajectory after joining. When True, source_traj is deleted from trajectory list.
- new_namestr, optional
New name for target trajectory after joining. If None, keeps original target name.
- forcebool, default=False
Whether to force join even when features have been calculated. When True, existing features become invalid and should be recalculated.
Returns
- None
Modifies target trajectory in-place and optionally removes source.
Examples
Basic join (source removed): >>> pipeline.trajectory.join(target_traj=0, source_traj=1)
Join and keep source: >>> pipeline.trajectory.join( … target_traj=0, … source_traj=1, … remove_source=False … )
Join with renaming: >>> pipeline.trajectory.join( … target_traj=”system_A_rep1”, … source_traj=”system_A_rep2”, … new_name=”system_A_combined” … )
Join without topology check: >>> pipeline.trajectory.join( … target_traj=0, … source_traj=1, … check_topology=False … )
Notes
Dask trajectories (use_memmap=True) handle memory management automatically
Both trajectories must have compatible topologies (same atoms)
Time arrays are concatenated; check for time continuity separately
Labels are preserved for target trajectory only
Source trajectory labels are discarded
Raises
- ValueError
If no trajectories are loaded
- ValueError
If target_traj or source_traj are invalid
- ValueError
If target and source refer to same trajectory
- ValueError
If topologies are incompatible (when check_topology=True)
- stack(pipeline_data: PipelineData, target_traj: int | str, source_traj: int | str, remove_source: bool = True, new_name: str | None = None, force: bool = False) None
Stack two trajectories along the atom axis (creates new trajectory).
This method combines atoms from source_traj with target_traj, creating a single trajectory with combined atoms but requiring identical frame counts. This is useful for combining protein and ligand trajectories or merging different molecular components. The target trajectory is replaced with the stacked result, and optionally the source can be removed.
Warning
When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.
Pipeline mode:
>>> pipeline = PipelineManager() >>> pipeline.trajectory.stack(target_traj=0, source_traj=1) # NO pipeline_data parameter
Standalone mode:
>>> pipeline_data = PipelineData() >>> manager = TrajectoryManager() >>> manager.stack(pipeline_data, target_traj=0, source_traj=1) # pipeline_data required
Parameters
- pipeline_dataPipelineData
Pipeline data container with trajectory data
- target_trajint or str
Target trajectory (receives stacked atoms). Can be index or name.
- source_trajint or str
Source trajectory (provides atoms to stack). Can be index or name.
- remove_sourcebool, default=True
Whether to remove source trajectory after stacking. When True, source_traj is deleted from trajectory list.
- new_namestr, optional
New name for target trajectory after stacking. If None, keeps original target name.
- forcebool, default=False
Whether to force stack even when features have been calculated. When True, existing features become invalid and should be recalculated.
Returns
- None
Replaces target trajectory with stacked result and optionally removes source.
Examples
Basic stack (source removed): >>> pipeline.trajectory.stack(target_traj=0, source_traj=1)
Stack and keep source: >>> pipeline.trajectory.stack( … target_traj=0, … source_traj=1, … remove_source=False … )
Stack with renaming: >>> pipeline.trajectory.stack( … target_traj=”protein”, … source_traj=”ligand”, … new_name=”complex” … )
Notes
Dask trajectories (use_memmap=True) handle memory management automatically
Both trajectories MUST have same number of frames
Topologies are merged to create combined system
Labels from both trajectories are combined and renumbered
Useful for combining protein + ligand, or multiple chains
Raises
- ValueError
If no trajectories are loaded
- ValueError
If target_traj or source_traj are invalid
- ValueError
If target and source refer to same trajectory
- ValueError
If frame counts differ between trajectories