"""This module contains classes for storing atomic data.
The Frame class may contain multiple Residues which may each contain multiple Atoms.
Both Frame and Residue are iterable. Residue is indexable with either atom numbers or names.
"""
import logging
import pathlib
import typing
import mdtraj
import numpy as np
from . import util
logger = logging.getLogger(__name__)
np.seterr(all="raise")
PathLike = typing.Union[pathlib.Path, str]
[docs]class NonMatchingSystemError(ValueError):
"""Exception raised when topology and trajectory do not match."""
def __init__(self, msg=None):
if msg is None:
msg = "Number of atoms does not match between topology and trajectory files"
super(NonMatchingSystemError, self).__init__(msg)
[docs]def load_traj(filepath: PathLike, **kwargs) -> mdtraj.Trajectory:
"""Load a trajectory, if a PDB fails with zero box volume disable volume check and try again."""
filepath = pathlib.Path(filepath)
if filepath.suffix.lower() == ".pdb":
# PDBs have a couple of things that can go wrong - we handle these here...
kwargs.pop("top", None) # Can't specify a topology for `load_pdb`
try:
return mdtraj.load_pdb(str(filepath), **kwargs)
except FloatingPointError:
# PDB file loading checks density using the simulation box
# This can fail if the box volume is zero
trajectory = mdtraj.load_pdb(str(filepath), no_boxchk=True, **kwargs)
logger.warning(
"Unitcell has zero volume - periodic boundaries will not be accounted for. "
"If the molecule is split by a periodic boundary, results will be incorrect."
)
return trajectory
return mdtraj.load(str(filepath), **kwargs)
[docs]class Frame:
"""Load and store data from a simulation trajectory."""
def __init__(
self,
topology_file: typing.Optional[PathLike] = None,
trajectory_file: typing.Optional[PathLike] = None,
frame_start: int = 0,
frame_end: typing.Optional[int] = None,
):
"""Load a simulation trajectory.
:param topology_file: File containing system topology
:param trajectory_file: File containing simulation trajectory
:param frame_start: First frame of trajectory to use
:param frame_end: Last frame of trajectory to use
"""
if topology_file is not None:
try:
logging.info("Loading topology file")
self._trajectory = load_traj(topology_file)
self._topology = self._trajectory.topology
logging.info("Finished loading topology file")
if trajectory_file is not None:
try:
logging.info("Loading trajectory file - this may take a while")
self._trajectory = load_traj(
trajectory_file, top=self._topology
)
self._trajectory = self._slice_trajectory(
frame_start, frame_end
)
logging.info(
"Finished loading trajectory file - loaded %d frames",
self._trajectory.n_frames,
)
except ValueError as exc:
raise NonMatchingSystemError from exc
self._load_trajectory()
except OSError as exc:
if "no loader" in str(exc) or "format is not supported" in str(exc):
raise UnsupportedFormatException from exc
raise
else:
# No topology - we're probably building a CG frame
self._topology = mdtraj.Topology()
[docs] def _slice_trajectory(
self, frame_start: int = 0, frame_end: typing.Optional[int] = None
) -> mdtraj.Trajectory:
"""Slice input simulation trajectory to a subset of frames.
:param frame_start: First frame of trajectory to use
:param frame_end: Last frame of trajectory to use
"""
traj = self._trajectory
if frame_start != 0:
if frame_end is not None:
traj = traj[frame_start:frame_end]
else:
traj = traj[frame_start:]
return traj
[docs] def _load_trajectory(self) -> None:
"""Load a trajectory into the frame attributes."""
# Improve performance by not double indexing repeatedly
traj = self._trajectory
for atom in self._topology.atoms:
atom.coords = traj.xyz[:, atom.index]
# TODO handle non-cubic boxes
try:
self.unitcell_lengths = traj.unitcell_lengths
self.unitcell_angles = traj.unitcell_angles
except TypeError:
self.unitcell_lengths = None
self.unitcell_angles = None
self.time = traj.time
# Cast unitcell vectors to float32 to avoid warning - they're currently float64
if traj.unitcell_vectors is not None:
traj.unitcell_vectors = traj.unitcell_vectors.astype(np.float32)
@property
def residues(self):
"""Residues in the frame topology."""
return self._topology.residues
[docs] def residue(self, *args, **kwargs) -> mdtraj.core.topology.Residue:
"""Get a residue from the frame topology by name or id."""
return self._topology.residue(*args, **kwargs)
@property
def atoms(self):
"""Atoms in the frame topology."""
return self._topology.atoms
[docs] def atom(self, *args, **kwargs) -> mdtraj.core.topology.Atom:
"""Get an atom from the frame topology by name or id."""
return self._topology.atom(*args, **kwargs)
@property
def natoms(self) -> int:
"""Number of atoms in the frame topology."""
return self._topology.n_atoms
@property
def n_frames(self) -> int:
"""Number of frames in the trajectory."""
return self._trajectory.n_frames
[docs] def add_residue(
self,
name,
chain: typing.Optional[mdtraj.core.topology.Residue] = None,
**kwargs
) -> mdtraj.core.topology.Residue:
"""Add a residue to the frame topology.
:param name: Name of residue
:param chain: MDTraj chain of residue
:return: New residue
"""
if hasattr(self, "_trajectory"):
raise TypeError("Cannot edit residues if a trajectory has been loaded")
if chain is None:
try:
chain = self._topology.chain(0)
except IndexError:
chain = self._topology.add_chain()
return self._topology.add_residue(name, chain, **kwargs)
[docs] def add_atom(
self,
name: str,
element: typing.Optional[mdtraj.element.Element],
residue: mdtraj.core.topology.Residue,
) -> mdtraj.core.topology.Atom:
"""Add an atom or CG bead to the frame topology.
:param name: Name of atom
:param element: MDTraj element of atom
:param residue: MDTraj residue of atom
:return: New atom
"""
if hasattr(self, "_trajectory"):
raise TypeError("Cannot edit atoms if a trajectory has been loaded")
return self._topology.add_atom(name, element, residue)
[docs] def save(
self, filename: PathLike, frame_number: typing.Optional[int] = None, **kwargs
) -> None:
"""Write trajctory to file.
:param filename: Name of output file
:param frame_number: Which trajectory frame to save - defaults to all
"""
traj = self._trajectory
if frame_number is not None:
traj = traj.slice(frame_number)
util.backup_file(filename)
traj.save(str(filename), **kwargs)
[docs] def build_trajectory(self) -> None:
"""Build an MDTraj trajectory from atom coordinates and the values stored as attributes on this frame."""
xyz = np.array([atom.coords for atom in self._topology.atoms], dtype=np.float32)
# We currently have axes: 0 - each atom, 1 - timestep, 2 - xyz coords
# Need to convert to axes: 0 - timestep, 1 - each atom, 2 - xyz coords
try:
xyz = xyz.swapaxes(0, 1)
except np.AxisError:
# No atoms, so make an empty array with the right shape
xyz = np.empty((len(self.time), 0, 3))
optional_values = {
attr: getattr(self, attr, None)
for attr in {"time", "unitcell_lengths", "unitcell_angles"}
}
new_trajectory = mdtraj.Trajectory(
xyz, topology=self._topology, **optional_values
)
if hasattr(self, "_trajectory"):
self._trajectory += new_trajectory
else:
self._trajectory = new_trajectory