Trajectory Manager

GitHub Link to Code.

Trajectory management module for loading and manipulating MD trajectory data.

class mdxplain.trajectory.manager.trajectory_manager.TrajectoryManager(stride: int = 1, concat: bool = False, selection: str | None = None, cache_dir: str = './cache', use_memmap: bool = False, chunk_size: int = 1000)

Manager for pure trajectory data objects without feature dependencies.

Provides methods to load, add, remove, slice, and select atoms in MD trajectories. This manager operates on TrajectoryData objects and does not depend on any feature data. It is designed to be used both standalone and within a pipeline context.

It handles various trajectory formats, automatic format detection, and provides a consistent interface for working with trajectory data.

It can load multiple trajectories, apply selections, and manage memory-efficient representations using DaskMDTrajectory.

It can load trajectories from directories and nested directories or lists of files, handle topology files, and apply MDTraj selection strings.

__init__(stride: int = 1, concat: bool = False, selection: str | None = None, cache_dir: str = './cache', use_memmap: bool = False, chunk_size: int = 1000)

Initialize trajectory manager.

Parameters

strideint, default=1

Load every stride-th frame from trajectories. Use values > 1 to reduce memory usage and computation time by subsampling frames.

concatbool, default=False

Whether to concatenate multiple trajectories per system into single trajectory objects. Useful when dealing with trajectory splits.

selectionstr, optional

MDTraj selection string to apply to all loaded trajectories. See: https://mdtraj.org/1.9.4/atom_selection.html

use_memmapbool, default=False

Whether to use memory-mapped DaskMDTrajectory for large files. When True, trajectories are loaded as DaskMDTrajectory objects for efficient memory usage.

chunk_sizeint, default=1000

Chunk size for DaskMDTrajectory (only used when use_memmap=True). Number of frames per chunk for memory management.

cache_dirstr, default=”./cache”

Directory for caching intermediate results and Zarr files.

Returns

None

Initializes TrajectoryManager instance with default parameters

Examples

>>> traj_data = TrajectoryData()
>>> traj_manager = TrajectoryManager()
>>> traj_manager.load_trajectories(traj_data, '../data')
load_trajectories(pipeline_data: PipelineData, data_input: str | List[Any], concat: bool | None = None, stride: int | None = 1, selection: str | None = None, force: bool = False) None

Load molecular dynamics trajectories from files or directories into PipelineData.

This method handles loading of MD trajectories in various formats (e.g., .xtc, .dcd, .trr) along with their topology files. The loading is performed using the TrajectoryLoadHelper class which supports automatic format detection and multiple trajectory handling.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.load_trajectories('../data')  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.load_trajectories(pipeline_data, '../data')  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container where trajectories will be stored

data_inputstr or list

Path to directory containing trajectory files, or list of trajectory file paths. When a directory is provided, all supported trajectory files in that directory will be loaded.

concatbool, optional

Whether to concatenate multiple trajectories per system into single trajectory objects. If None, uses manager default.

strideint, optional

Load every stride-th frame from trajectories. If None, uses manager default.

selectionstr, optional

MDTraj selection string to apply to all loaded trajectories. If None, uses manager default.

forcebool, default=False

Whether to force loading even when features have been calculated. When True, existing features become invalid and should be recalculated.

Returns

None

Loads trajectories into pipeline_data.trajectory_data and sets up topology/names

Examples

Pipeline mode (automatic injection): >>> pipeline = PipelineManager() >>> pipeline.trajectory.load_trajectories(‘../data’)

Standalone mode: >>> pipeline_data = PipelineData() >>> traj_manager = TrajectoryManager() >>> traj_manager.load_trajectories(pipeline_data, ‘../data’)

>>> # Load with tags
>>> traj_manager.load_trajectories(
...     pipeline_data, '../data', tags_file='tags.json'
... )

Notes

  • Supported formats depend on MDTraj capabilities

  • Topology files (.pdb, .gro, .psf) should be in the same directory

  • Large trajectories benefit from striding to reduce memory usage

  • Selection is applied to all trajectories after loading

add_trajectory(pipeline_data: PipelineData, data_input: str | List[Any], concat: bool | None = None, stride: int | None = 1, selection: str | None = None) None

Add molecular dynamics trajectories to TrajectoryData object.

This method works like load_trajectories but appends new trajectories instead of replacing existing ones. Useful for loading additional trajectory data without losing previously loaded trajectories.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.add_trajectory('../data2')  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.add_trajectory(pipeline_data, '../data2')  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

data_inputstr or list

Path to directory containing trajectory files, or list of trajectory file paths. When a directory is provided, all supported trajectory files in that directory will be loaded.

concatbool, optional

Whether to concatenate multiple trajectories per system into single trajectory objects. If None, uses manager default.

strideint, optional

Load every stride-th frame from trajectories. If None, uses manager default.

selectionstr, optional

MDTraj selection string to apply to all newly loaded trajectories. If None, uses manager default.

Returns

None

Appends new trajectories to existing trajectory list in pipeline_data

Examples

>>> traj_data = TrajectoryData()
>>> traj_manager = TrajectoryManager()
>>> traj_manager.load_trajectories(traj_data, '../data')
>>> traj_manager.add_trajectory(traj_data, '../data2')

Raises

ValueError

If no trajectories are currently loaded

Notes

  • New trajectories are appended to existing ones

  • Trajectory names are also appended to maintain consistency

  • Selection is only applied to newly loaded trajectories

  • Existing trajectories remain unchanged

remove_trajectory(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, force: bool = False) None

Remove specified trajectories from the loaded trajectory list.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.remove_trajectory([0, 1])  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.remove_trajectory(pipeline_data, [0, 1])  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

traj_selectionint, str, list, or “all”

Trajectory selection (required). Options:

  • int: single trajectory by index (e.g., 0)

  • str: trajectory name or “all” for all trajectories

  • list: multiple indices/names (can be mixed)

  • “all”: all loaded trajectories

forcebool, default=False

Whether to force removal even when features have been calculated. When True, existing features become invalid and should be recalculated.

Returns

None

Removes trajectories from pipeline_data

Examples

>>> pipeline_data = PipelineData()
>>> traj_manager = TrajectoryManager()
>>> traj_manager.load_trajectories(pipeline_data, '../data')
>>> traj_manager.remove_trajectory(pipeline_data, [0, 1])

Raises

ValueError

If trajectories are not loaded, if trajs contains invalid indices/names

slice_traj(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, frames: int | slice | List[int] | None = None, stride: int | None = 1, cut: int | None = None, data_selector: str | None = None, force: bool = False, inplace: bool = True) None

Slice trajectories using frame ranges, stride, OR DataSelector.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.slice_traj(traj_selection="all", frames=1000)  # NO pipeline_data parameter
>>> pipeline.trajectory.slice_traj(traj_selection="all", data_selector="folded_frames")  # Use DataSelector

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.slice_traj(pipeline_data, frames=1000, traj_selection="all")  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

traj_selectionint, str, list, or “all”

Selection of trajectories to process (required):

  • int: trajectory index

  • str: trajectory name or “all” for all trajectories

  • list: list of indices/names

  • “all”: all loaded trajectories

framesint, slice, list, optional

Frame specification for slicing:

  • int: include frames 0 to frames (e.g., frames=1000 → frames 0-999)

  • slice: direct slice object (e.g., slice(100, 500) → frames 100-499)

  • list: specific frame indices (e.g., [0, 10, 20, 30])

Ignored if data_selector is provided.

strideint, optional

Take every stride-th frame. Use values > 1 to subsample frames. If None, uses manager default.

cutint, optional

Frame number after which to cut trajectories. Frames after this index will be removed. Applied after frame selection and stride.

data_selectorstr, optional

Name of DataSelector to use for frame selection. If provided, overrides frames/stride parameters and uses the selected frames from the DataSelector.

forcebool, default=False

Whether to force slicing even when features have been calculated. When True, existing features become invalid and should be recalculated.

inplacebool, default=True

Whether to perform the operation in-place. For DaskMDTrajectory, slicing returns a lazy view which is updated in the pipeline.

Returns

None

Modifies trajectories and updates pipeline_data.

Examples

>>> pipeline_data = PipelineData()
>>> traj_manager = TrajectoryManager()
>>> traj_manager.load_trajectories(pipeline_data, '../data')
>>> traj_manager.slice_traj(pipeline_data, traj_selection="all", frames=1000)

Raises

ValueError

If trajectories are not loaded or if selection contains invalid indices/names or if DataSelector does not exist

select_atoms(pipeline_data: PipelineData, selection: str, traj_selection: int | str | List[int | str] | 'all', force: bool = False, inplace: bool = True) None

Apply atom selection to trajectories using MDTraj selection syntax.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.select_atoms("protein", "all")  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.select_atoms(pipeline_data, "protein", "all")  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

traj_selectionint, str, list, or “all”

Selection of trajectories to process:

  • int: trajectory index

  • str: trajectory name or “all” for all trajectories

  • list: list of indices/names

selectionstr

MDTraj selection string (e.g., “protein”, “backbone”, “resid 10 to 50”) See: https://mdtraj.org/1.9.4/atom_selection.html

forcebool, default=False

Whether to force atom selection even when features have been calculated. When True, existing features become invalid and should be recalculated.

inplacebool, default=True

Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.

Returns

None

Applies atom selection to trajectories and updates pipeline_data

Examples

>>> pipeline_data = PipelineData()
>>> traj_manager = TrajectoryManager()
>>> traj_manager.load_trajectories(pipeline_data, '../data')
>>> traj_manager.select_atoms(pipeline_data, "protein", "all")
>>> # Select atoms from specific trajectories
>>> traj_manager.select_atoms(pipeline_data, "protein", [0, 1, 2])

Raises

ValueError

If trajectories are not loaded or if selection/trajs contain invalid values

add_labels(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | 'all', fragment_definition: str | Dict[str, Tuple[int, int]] | None = None, fragment_type: str | Dict[str, str] | None = None, fragment_molecule_name: str | Dict[str, str] | None = None, consensus: bool = False, aa_short: bool = False, try_web_lookup: bool = True, force: bool = False, **nomenclature_kwargs) None

Add nomenclature labels to selected trajectories.

This method creates consensus labels for MD trajectories using different mdciao nomenclature systems (GPCR, CGN, KLIFS). Different systems can have different nomenclatures by applying labels to specific trajectory selections.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.add_labels([0, 1], fragment_definition="receptor")  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.add_labels(pipeline_data, [0, 1], fragment_definition="receptor")  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

traj_selectionint, str, list, or “all”

Selection of trajectories to add labels to:

  • int: trajectory index

  • str: trajectory name or “all” for all trajectories

  • list: list of indices/names

  • “all”: all loaded trajectories

fragment_definitionstr or dict, default None

If string, uses that as fragment name for entire topology. If dict, maps fragment names to residue ranges: {“cgn_a”: (0, 348), “beta2”: (400, 684)} Only required when consensus=True.

fragment_typestr or dict, default None

If string, uses that nomenclature type for all fragments. If dict, maps fragment names to nomenclature types: {“cgn_a”: “cgn”, “beta2”: “gpcr”}. Use mdciao nomenclature types. Allowed types: gpcr, cgn, klifs Only required when consensus=True.

fragment_molecule_namestr or dict, default None

If string, uses that molecule name for all fragments. If dict, maps fragment names to molecule names: {“cgn_a”: “gnas2_bovin”, “beta2”: “adrb2_human”}. Use the UniProt entry name (not accession ID) for GPCR/CGN labelers, or KLIFS string for KLIFS labelers. See https://www.uniprot.org/help/difference_accession_entryname # noqa: E501 for UniProt naming conventions. See https://proteinformatics.uni-leipzig.de/mdciao/api/generated/generated/mdciao.nomenclature.LabelerKLIFS.html#mdciao.nomenclature.LabelerKLIFS # noqa: E501 for KLIFS naming conventions. Only required when consensus=True.

consensusbool, default False

Whether to use consensus labeling (combines AA codes with nomenclature labels). If False, only returns amino acid labels without nomenclature.

aa_shortbool, default True

Whether to use short amino acid names (T vs THR)

verbosebool, default False

Whether to enable verbose output from labelers

try_web_lookupbool, default True

Whether to try web lookup for molecule data

forcebool, default False

Whether to force label addition even when labels already exist. When True, existing labels will be overwritten. When False, raises ValueError if labels exist.

write_to_diskbool, default False

Whether to write cache files to disk

cache_folderstr, default “./cache”

Folder for cache files

nomenclature_kwargs

Additional keyword arguments passed to the mdciao labelers

Returns

None

Raises

ValueError

If trajectories are not loaded

ValueError

If traj_selection contains invalid indices or names

ValueError

If nomenclature labels already exist for selected trajectories and force=False

ValueError

If fragment_definition is required when consensus=True

ValueError

If fragment_type is required when consensus=True

ValueError

If fragment_molecule_name is required when consensus=True

ValueError

If fragment_definition is not a string or dictionary

ValueError

If fragment_type is not a string or dictionary

ValueError

If fragment_molecule_name is not a string or dictionary

Notes

This method wraps mdciao consensus nomenclature systems: https://proteinformatics.uni-leipzig.de/mdciao/api/generated/mdciao.nomenclature.html

Supported fragment types:

Examples

>>> pipeline_data = PipelineData()
>>> traj_manager = TrajectoryManager()
>>> traj_manager.load_trajectories(pipeline_data, '../data')
>>> # Add labels to specific trajectories (different systems)
>>> traj_manager.add_labels(
...     pipeline_data, [0, 1], fragment_definition="receptor", fragment_type="gpcr",
...     fragment_molecule_name="adrb2_human", consensus=True
... )
>>> traj_manager.add_labels(
...     pipeline_data, [2, 3], fragment_definition="kinase", fragment_type="klifs",
...     fragment_molecule_name="abl1_human", consensus=True
... )
>>> # Add labels to all trajectories
>>> traj_manager.add_labels(pipeline_data, "all", fragment_definition="protein")
>>> # Adding labels again requires force=True
>>> traj_manager.add_labels(pipeline_data, "all", force=True)  # Overwrites existing labels
add_tags(pipeline_data: PipelineData, trajectory_selector: int | str | List[Any] | range | Dict[Any, List[str]], tags: List[str] | None = None) None

Add tags to trajectories using flexible selectors.

This method supports single trajectories, multiple trajectories, pattern matching, and bulk assignment using dictionaries. It provides a powerful interface for managing trajectory tags in complex scenarios.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.add_tags(0, ["system_A"])  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.add_tags(pipeline_data, 0, ["system_A"])  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

trajectory_selectorint, str, list, range, dict

Flexible selector for trajectories:

  • int: single trajectory by index (e.g., 0)

  • str: single trajectory by name or pattern (e.g., “traj1”, “system_*”)

    Supports multiple string formats:

    • Range: “0-3”, “id 0-3” → [0, 1, 2, 3]

    • Comma list: “1,2,4,5”, “id 1,2,4,5” → [1, 2, 4, 5]

    • Single number: “7”, “id 7” → [7]

    • Pattern: “system_*” → fnmatch pattern matching

  • list: multiple selectors (e.g., [0, 1, “special_traj”, range(5,8)])

  • range: range of indices (e.g., range(0, 4))

  • dict: bulk assignment {selector: tags, …}

tagslist, optional

List of tag strings to add. Required when trajectory_selector is not dict. Ignored when trajectory_selector is dict.

Returns

None

Adds tags to selected trajectories and rebuilds frame mapping

Examples

>>> # Single trajectory
>>> traj_manager.add_tags(pipeline_data, 0, ["system_A", "biased"])
>>> traj_manager.add_tags(pipeline_data, "traj1", ["system_B"])
>>> # Multiple trajectories
>>> traj_manager.add_tags(pipeline_data, [0, 1, 2], ["control"])
>>> traj_manager.add_tags(pipeline_data, range(0, 4), ["batch_1"])
>>> # Pattern matching
>>> traj_manager.add_tags(pipeline_data, "system_2_*", ["system_B"])
>>> # Complex nested selectors
>>> traj_manager.add_tags(pipeline_data, [range(0,3), "system_2_*"], ["mixed"])
>>> # Bulk assignment with dict
>>> traj_manager.add_tags(pipeline_data, {
...     [range(0,4)]: ["system_A", "unbiased"],
...     [range(4,8), "special_traj"]: ["control"],
...     "system_2_*": ["system_B", "production"]
... })

Raises

ValueError

If tags is None when trajectory_selector is not dict

ValueError

If trajectory selector contains invalid indices or names

set_tags(pipeline_data: PipelineData, trajectory_selector: int | str | List[Any] | range | Dict[Any, List[str]], tags: List[str] | None = None) None

Set (replace) tags for trajectories using flexible selectors.

This method completely replaces existing tags instead of merging them. It supports the same flexible selector system as add_tags() but provides replacement semantics for tag management scenarios where you need to reset or completely change trajectory tags.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.set_tags(0, ["system_A"])  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.set_tags(pipeline_data, 0, ["system_A"])  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

trajectory_selectorint, str, list, range, dict

Flexible selector for trajectories:

  • int: single trajectory by index (e.g., 0)

  • str: single trajectory by name or pattern (e.g., “traj1”, “system_*”)

    Supports multiple string formats:

    • Range: “0-3”, “id 0-3” → [0, 1, 2, 3]

    • Comma list: “1,2,4,5”, “id 1,2,4,5” → [1, 2, 4, 5]

    • Single number: “7”, “id 7” → [7]

    • Pattern: “system_*” → fnmatch pattern matching

  • list: multiple selectors (e.g., [0, 1, “special_traj”, range(5,8)])

  • range: range of indices (e.g., range(0, 4))

  • dict: bulk assignment {selector: tags, …}

tagslist, optional

List of tag strings to set. Required when trajectory_selector is not dict. Ignored when trajectory_selector is dict.

Returns

None

Sets tags for selected trajectories and rebuilds frame mapping

Examples

>>> # Replace tags for single trajectory
>>> traj_manager.add_tags(pipeline_data, 0, ["old_tag", "other"])
>>> traj_manager.set_tags(pipeline_data, 0, ["new_tag"])  # Replaces both old tags
>>> # Reset multiple trajectories to same tags
>>> traj_manager.set_tags(pipeline_data, [0, 1, 2], ["reset", "control"])
>>> # Clear all tags (set to empty)
>>> traj_manager.set_tags(pipeline_data, "all", [])
>>> # Bulk replacement with dict
>>> traj_manager.set_tags(pipeline_data, {
...     [0, 1]: ["system_A", "production"],
...     [2, 3]: ["system_B", "test"],
...     "control_*": ["control"]
... })

Raises

ValueError

If tags is None when trajectory_selector is not dict

ValueError

If trajectory selector contains invalid indices or names

rename_trajectories(pipeline_data: PipelineData, name_mapping: Dict[int | str, str] | List[str]) None

Rename trajectory names.

This method allows renaming trajectory names for better organization and more descriptive identification. Supports both dictionary-based mapping and positional list assignment.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.rename_trajectories({0: "new_name"})  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.rename_trajectories(pipeline_data, {0: "new_name"})  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

name_mappingdict or list

Mapping for trajectory names:

  • dict: {old_name_or_index: new_name, …} for selective renaming

  • list: [new_name1, new_name2, …] for positional assignment

Returns

None

Renames trajectories and rebuilds frame tag mapping

Examples

>>> # Dictionary-based renaming
>>> traj_manager.rename_trajectories(pipeline_data, {
...     0: "system_A_replicate_1",
...     "old_traj_name": "system_B_replicate_1",
...     2: "control_experiment"
... })
>>> # Positional list assignment
>>> traj_manager.rename_trajectories(pipeline_data, [
...     "system_A_rep1",
...     "system_A_rep2",
...     "system_B_rep1",
...     "control"
... ])

Raises

ValueError

If no trajectories are loaded, mapping is invalid, or references invalid trajectories

reset_trajectory_data(pipeline_data: PipelineData) None

Reset the trajectory data object to empty state.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.reset_trajectory_data()  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.reset_trajectory_data(pipeline_data)  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data object

Returns

None

Resets trajectory data to empty state

Examples

>>> traj_manager.reset_trajectory_data(pipeline_data)
save(pipeline_data: PipelineData, save_path: str) None

Save trajectory data to disk.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.save('trajectory_backup.pkl')  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.save(pipeline_data, 'trajectory_backup.pkl')  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

save_pathstr

Path where to save the trajectory data

Returns

None

Saves the trajectory data to the specified path

Examples

>>> trajectory_manager.save(pipeline_data, 'trajectory_backup.pkl')
load(pipeline_data: PipelineData, load_path: str) None

Load trajectory data from disk.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.load('trajectory_backup.pkl')  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.load(pipeline_data, 'trajectory_backup.pkl')  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container to load trajectory data into

load_pathstr

Path to the saved trajectory data file

Returns

None

Loads the trajectory data from the specified path

Examples

>>> trajectory_manager.load(pipeline_data, 'trajectory_backup.pkl')
print_info(pipeline_data: PipelineData) None

Print trajectory information.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.print_info()  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.print_info(pipeline_data)  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

Returns

None

Prints trajectory information to console

Examples

>>> trajectory_manager.print_info(pipeline_data)
=== TrajectoryData ===
Loaded 3 trajectories:
  [0] system1_prot_traj1: 1000 frames, tags: ['system_A', 'biased']
  [1] system1_prot_traj2: 1500 frames, tags: ['system_A', 'unbiased']
  [2] system2_prot_traj1: 800 frames, tags: ['system_B', 'biased']
select_trajs(pipeline_data: PipelineData, data_selector: str) List[DaskMDTrajectory | md.Trajectory]

Create new trajectory objects from DataSelector frames.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> selected = pipeline.trajectory.select_trajs("folded_frames")  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> selected = manager.select_trajs(pipeline_data, "folded_frames")  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data with trajectories and DataSelector

data_selectorstr

Name of DataSelector to use

Returns

List[Union[DaskMDTrajectory, md.Trajectory]]

List of new trajectory objects with selected frames

Examples

>>> # Create new trajectories from DataSelector
>>> selected = pipeline.trajectory.select_trajs("folded_frames")
>>> print(f"Created {len(selected)} new trajectories")
>>> 
>>> # Use the returned trajectories for analysis
>>> for traj in selected:
...     print(f"Trajectory has {traj.n_frames} frames")
superpose(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, reference_traj: int = 0, reference_frame: int = 0, atom_selection: str = 'backbone', inplace: bool = True) None

Superpose selected trajectories to a reference frame.

This method aligns all frames of selected trajectories to a specific reference frame using MDTraj’s superpose functionality. By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.superpose(traj_selection="all", reference_traj=0, reference_frame=0)  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.superpose(pipeline_data, traj_selection="all", reference_traj=0, reference_frame=0)  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

traj_selectionint, str, list, or “all”

Selection of trajectories to align (required):

  • int: single trajectory by index

  • str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)

  • list: multiple indices/names/tags

  • “all”: all loaded trajectories

reference_trajint, default=0

Index of trajectory containing the reference frame

reference_frameint, default=0

Frame index within reference trajectory to use as alignment reference

atom_selectionstr, default=”backbone”

MDTraj selection string for atoms to use in alignment calculation. Common selections: “backbone”, “name CA”, “protein”, “resid 10 to 50” See: https://mdtraj.org/1.9.4/atom_selection.html

inplacebool, default=True

Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.

Returns

None

Modifies trajectories and updates pipeline_data.

Examples

Basic alignment: >>> pipeline.trajectory.superpose(traj_selection=”all”) # Align all to first frame of first trajectory

Specific reference: >>> pipeline.trajectory.superpose( … traj_selection=”all”, … reference_traj=2, … reference_frame=100, … atom_selection=”name CA” … )

Notes

  • Dask trajectories (use_memmap=True) handle memory management automatically

  • All trajectories must have compatible topology for alignment

  • Large trajectories may take significant time to align

Raises

ValueError

If no trajectories are loaded

ValueError

If reference_traj index is invalid

ValueError

If reference_frame index is invalid for reference trajectory

ValueError

If traj_selection contains invalid indices/names

ValueError

If atom_selection produces no atoms or incompatible atom counts

center_coordinates(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, mass_weighted: bool = False, force: bool = False, inplace: bool = True) None

Center trajectory coordinates at the origin.

This method centers all frames of selected trajectories at the origin using either geometric centering (default) or mass-weighted centering. By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.center_coordinates(traj_selection="all")  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.center_coordinates(pipeline_data, traj_selection="all")  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

traj_selectionint, str, list, or “all”

Selection of trajectories to center (required):

  • int: single trajectory by index

  • str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)

  • list: multiple indices/names/tags

  • “all”: all loaded trajectories

mass_weightedbool, default=False

Use mass-weighted centering instead of geometric centering. When True, the center of mass is used; when False, the geometric center (centroid) is used.

forcebool, default=False

Whether to force centering even when features have been calculated. When True, existing features become invalid and should be recalculated.

inplacebool, default=True

Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.

Returns

None

Modifies trajectories and updates pipeline_data.

Examples

Basic geometric centering: >>> pipeline.trajectory.center_coordinates(traj_selection=”all”)

Mass-weighted centering: >>> pipeline.trajectory.center_coordinates( … traj_selection=”all”, … mass_weighted=True … )

Notes

  • Dask trajectories (use_memmap=True) handle memory management automatically

  • Centering is useful before RMSD calculations or structural analysis

  • Mass-weighted centering is more physically meaningful for biomolecules

  • This operation modifies coordinates but preserves topology

Raises

ValueError

If no trajectories are loaded

ValueError

If traj_selection contains invalid indices/names

smooth(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, width: int, order: int | None = None, atom_selection: str | None = None, force: bool = False, inplace: bool = True) None

Apply Savitzky-Golay smoothing filter to trajectory coordinates.

This method smooths trajectory coordinates using a Savitzky-Golay filter. By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory. Smoothing can be applied to all atoms or a subset selected via atom_selection.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.smooth(traj_selection="all", width=5)  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.smooth(pipeline_data, traj_selection="all", width=5)  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

traj_selectionint, str, list, or “all”

Selection of trajectories to smooth (required):

  • int: single trajectory by index

  • str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)

  • list: multiple indices/names/tags

  • “all”: all loaded trajectories

widthint

Smoothing window width (must be odd). Larger values produce smoother trajectories but may lose important structural details.

orderint, optional

Polynomial order for Savitzky-Golay filter. If None, uses default from MDTraj implementation. Typical values: 2-4.

atom_selectionstr, optional

MDTraj selection string for atoms to smooth. If None, smooths all atoms. Common selections: “backbone”, “name CA”, “protein”, “resid 10 to 50” See: https://mdtraj.org/1.9.4/atom_selection.html

forcebool, default=False

Whether to force smoothing even when features have been calculated. When True, existing features become invalid and should be recalculated.

inplacebool, default=True

Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.

Returns

None

Modifies trajectories and updates pipeline_data.

Examples

Basic smoothing of all atoms: >>> pipeline.trajectory.smooth(traj_selection=”all”, width=5)

Smooth with specific polynomial order: >>> pipeline.trajectory.smooth( … traj_selection=”all”, … width=7, … order=3 … )

Notes

  • Dask trajectories (use_memmap=True) handle memory management automatically

  • Smoothing reduces high-frequency noise but may obscure fast dynamics

  • Window width should be odd; even values will be adjusted internally

  • Larger windows create smoother trajectories but lose temporal resolution

  • This operation modifies coordinates but preserves topology

Raises

ValueError

If no trajectories are loaded

ValueError

If traj_selection contains invalid indices/names

ValueError

If width is not positive

ValueError

If atom_selection is invalid or produces no atoms

image_molecules(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, anchor_molecules: np.ndarray | None = None, other_molecules: np.ndarray | None = None, make_whole: bool = True, force: bool = False, inplace: bool = True) None

Apply periodic boundary condition imaging to molecules.

This method recenters molecules and wraps them into the primary unit cell using MDTraj’s image_molecules method. By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.image_molecules(traj_selection="all")  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.image_molecules(pipeline_data, traj_selection="all")  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

traj_selectionint, str, list, or “all”

Selection of trajectories to image (required):

  • int: single trajectory by index

  • str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)

  • list: multiple indices/names/tags

  • “all”: all loaded trajectories

anchor_moleculesnp.ndarray, optional

Indices of molecules to anchor at the origin. If None, uses all molecules. Molecules are typically defined by bonded groups in the topology.

other_moleculesnp.ndarray, optional

Indices of other molecules to image relative to anchors. If None, uses all molecules not in anchor_molecules.

make_wholebool, default=True

Make molecules whole across periodic boundary conditions before imaging. This ensures molecules are not split across the box boundary.

forcebool, default=False

Whether to force imaging even when features have been calculated. When True, existing features become invalid and should be recalculated.

inplacebool, default=True

Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.

Returns

None

Modifies trajectories and updates pipeline_data.

Examples

Basic imaging (default parameters): >>> pipeline.trajectory.image_molecules(traj_selection=”all”)

Image with specific anchor molecules: >>> # Anchor protein (molecules 0-2), image solvent around it >>> protein_molecules = np.array([0, 1, 2]) >>> pipeline.trajectory.image_molecules( … traj_selection=[0, 1], … anchor_molecules=protein_molecules, … make_whole=True … )

Notes

  • Dask trajectories (use_memmap=True) handle memory management automatically

  • Requires trajectory to have periodic boundary condition information

  • Essential for correct visualization of periodic systems

  • Does not change topology or number of atoms

  • Make molecules whole first to prevent artifacts

Raises

ValueError

If no trajectories are loaded

ValueError

If traj_selection contains invalid indices/names

ValueError

If trajectory lacks unit cell information

remove_solvent(pipeline_data: PipelineData, traj_selection: int | str | List[int | str] | str, exclude: list | None = None, force: bool = False, inplace: bool = True) None

Remove solvent atoms from trajectories.

This method removes solvent atoms using MDTraj’s remove_solvent method, which identifies and removes common solvent molecules (water, ions, etc.). By default, the operation is performed in-place, overwriting the original Zarr cache for DaskMDTrajectory. The operation modifies both coordinates AND topology, changing the number of atoms in the trajectory. Labels are automatically adjusted to match the new residue structure.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.remove_solvent(traj_selection="all")  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.remove_solvent(pipeline_data, traj_selection="all")  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

traj_selectionint, str, list, or “all”

Selection of trajectories to process (required):

  • int: single trajectory by index

  • str: trajectory name, tag, or pattern (e.g., “tag:system_A”, “traj_*”)

  • list: multiple indices/names/tags

  • “all”: all loaded trajectories

excludelist, optional

List of solvent residue names to KEEP (not remove). Common values include [‘HOH’, ‘WAT’] to keep water molecules while removing other solvents. If None, removes all recognized solvent molecules.

forcebool, default=False

Whether to force removal even when features have been calculated. When True, existing features become invalid and should be recalculated.

inplacebool, default=True

Whether to perform the operation in-place. If True, overwrites the original Zarr cache (for DaskMDTrajectory). If False, creates a new permanent Zarr file.

Returns

None

Modifies trajectories and updates pipeline_data.

Examples

Remove all solvent: >>> pipeline.trajectory.remove_solvent(traj_selection=”all”)

Keep water, remove other solvent: >>> pipeline.trajectory.remove_solvent( … traj_selection=”all”, … exclude=[‘HOH’, ‘WAT’] … )

Notes

  • Dask trajectories (use_memmap=True) handle memory management automatically

  • This operation CHANGES the number of atoms and topology

  • Labels are automatically filtered to match new residue structure

  • Features must be recalculated after this operation

  • Common solvent names: HOH, WAT, SOL, Na+, Cl-, etc.

Raises

ValueError

If no trajectories are loaded

ValueError

If traj_selection contains invalid indices/names

join(pipeline_data: PipelineData, target_traj: int | str, source_traj: int | str, check_topology: bool = True, remove_source: bool = True, new_name: str | None = None, force: bool = False) None

Join two trajectories along the frame axis (in-place on target).

This method concatenates frames from source_traj to target_traj, creating a single trajectory with combined frames. The target trajectory is modified in-place to contain all frames. Optionally, the source trajectory can be removed after joining, and the target can be renamed.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.join(target_traj=0, source_traj=1)  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.join(pipeline_data, target_traj=0, source_traj=1)  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

target_trajint or str

Target trajectory (receives joined frames). Can be index or name.

source_trajint or str

Source trajectory (provides frames to join). Can be index or name.

check_topologybool, default=True

Whether to check topology compatibility between trajectories. When True, raises error if atom counts differ.

remove_sourcebool, default=True

Whether to remove source trajectory after joining. When True, source_traj is deleted from trajectory list.

new_namestr, optional

New name for target trajectory after joining. If None, keeps original target name.

forcebool, default=False

Whether to force join even when features have been calculated. When True, existing features become invalid and should be recalculated.

Returns

None

Modifies target trajectory in-place and optionally removes source.

Examples

Basic join (source removed): >>> pipeline.trajectory.join(target_traj=0, source_traj=1)

Join and keep source: >>> pipeline.trajectory.join( … target_traj=0, … source_traj=1, … remove_source=False … )

Join with renaming: >>> pipeline.trajectory.join( … target_traj=”system_A_rep1”, … source_traj=”system_A_rep2”, … new_name=”system_A_combined” … )

Join without topology check: >>> pipeline.trajectory.join( … target_traj=0, … source_traj=1, … check_topology=False … )

Notes

  • Dask trajectories (use_memmap=True) handle memory management automatically

  • Both trajectories must have compatible topologies (same atoms)

  • Time arrays are concatenated; check for time continuity separately

  • Labels are preserved for target trajectory only

  • Source trajectory labels are discarded

Raises

ValueError

If no trajectories are loaded

ValueError

If target_traj or source_traj are invalid

ValueError

If target and source refer to same trajectory

ValueError

If topologies are incompatible (when check_topology=True)

stack(pipeline_data: PipelineData, target_traj: int | str, source_traj: int | str, remove_source: bool = True, new_name: str | None = None, force: bool = False) None

Stack two trajectories along the atom axis (creates new trajectory).

This method combines atoms from source_traj with target_traj, creating a single trajectory with combined atoms but requiring identical frame counts. This is useful for combining protein and ligand trajectories or merging different molecular components. The target trajectory is replaced with the stacked result, and optionally the source can be removed.

Warning

When using PipelineManager, do NOT provide the pipeline_data parameter. The PipelineManager automatically injects this parameter.

Pipeline mode:

>>> pipeline = PipelineManager()
>>> pipeline.trajectory.stack(target_traj=0, source_traj=1)  # NO pipeline_data parameter

Standalone mode:

>>> pipeline_data = PipelineData()
>>> manager = TrajectoryManager()
>>> manager.stack(pipeline_data, target_traj=0, source_traj=1)  # pipeline_data required

Parameters

pipeline_dataPipelineData

Pipeline data container with trajectory data

target_trajint or str

Target trajectory (receives stacked atoms). Can be index or name.

source_trajint or str

Source trajectory (provides atoms to stack). Can be index or name.

remove_sourcebool, default=True

Whether to remove source trajectory after stacking. When True, source_traj is deleted from trajectory list.

new_namestr, optional

New name for target trajectory after stacking. If None, keeps original target name.

forcebool, default=False

Whether to force stack even when features have been calculated. When True, existing features become invalid and should be recalculated.

Returns

None

Replaces target trajectory with stacked result and optionally removes source.

Examples

Basic stack (source removed): >>> pipeline.trajectory.stack(target_traj=0, source_traj=1)

Stack and keep source: >>> pipeline.trajectory.stack( … target_traj=0, … source_traj=1, … remove_source=False … )

Stack with renaming: >>> pipeline.trajectory.stack( … target_traj=”protein”, … source_traj=”ligand”, … new_name=”complex” … )

Notes

  • Dask trajectories (use_memmap=True) handle memory management automatically

  • Both trajectories MUST have same number of frames

  • Topologies are merged to create combined system

  • Labels from both trajectories are combined and renumbered

  • Useful for combining protein + ligand, or multiple chains

Raises

ValueError

If no trajectories are loaded

ValueError

If target_traj or source_traj are invalid

ValueError

If target and source refer to same trajectory

ValueError

If frame counts differ between trajectories