#!/usr/bin/env python3
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
#
# Copyright (c) 2022 Authors and contributors
# (see the AUTHORS.rst file for the full list of names)
#
# Released under the GNU Public Licence, v3 or any higher version
# SPDX-License-Identifier: GPL-3.0-or-later
"""Base class for building Analysis classes."""
import inspect
import logging
from datetime import datetime
import MDAnalysis.analysis.base
import numpy as np
from MDAnalysis.analysis.base import Results
from MDAnalysis.lib.log import ProgressBar
from tqdm.contrib.logging import logging_redirect_tqdm
from .._version import get_versions
from ..lib.math import center_cluster, new_mean, new_variance
from ..lib.util import (
atomgroup_header,
correlation_analysis,
get_cli_input,
get_compound,
render_docs,
)
__version__ = get_versions()['version']
del get_versions
logger = logging.getLogger(__name__)
[docs]@render_docs
class AnalysisBase(MDAnalysis.analysis.base.AnalysisBase):
"""Base class derived from MDAnalysis for defining multi-frame analysis.
The class is designed as a template for creating multi-frame analyses.
This class will automatically take care of setting up the trajectory
reader for iterating, and it offers to show a progress meter.
Computed results are stored inside the :attr:`results` attribute.
To define a new analysis, `AnalysisBase` needs to be subclassed
and :meth:`_single_frame` must be defined. It is also possible to define
:meth:`_prepare` and :meth:`_conclude` for pre- and post-processing.
All results should be stored as attributes of the
:class:`MDAnalysis.analysis.base.Results` container.
Parameters
----------
${ATOMGROUPS_PARAMETER}
${BASE_CLASS_PARAMETERS}
multi_group : bool
Analysis is able to work with list of atomgroups
Attributes
----------
atomgroup : MDAnalysis.core.groups.AtomGroup
Atomgroup taken for the Analysis (available if `multi_group = False`)
atomgroups : list[MDAnalysis.core.groups.AtomGroup]
Atomgroups taken for the Analysis (available if `multi_group = True`)
n_atomgroups : int
Number of atomngroups (available if `multi_group = True`)
_universe : MDAnalysis.core.universe.Universe
The Universe the atomgroups belong to
_trajectory : MDAnalysis.coordinates.base.ReaderBase
The trajectory the atomgroups belong to
times : numpy.ndarray
array of Timestep times. Only exists after calling
:meth:`AnalysisBase.run`
frames : numpy.ndarray
array of Timestep frame indices. Only exists after calling
:meth:`AnalysisBase.run`
_frame_index : int
index of the frame currently analysed
_index : int
Number of frames already analysed (same as _frame_index + 1)
results : MDAnalysis.analysis.base.Results
results of calculation are stored after call
to :meth:`AnalysisBase.run`
_obs : MDAnalysis.analysis.base.Results
Observables of the current frame
_obs.box_center : numpy.ndarray
Center of the simulation cell of the current frame
means : MDAnalysis.analysis.base.Results
Means of the observables.
Keys are the same as :attr:`_obs`.
sems : MDAnalysis.analysis.base.Results
Standard errors of the mean of the observables.
Keys are the same as :attr:`_obs`
Raises
------
ValueError
If any of the provided AtomGroups (`atomgroups` or `refgroup`) does
not contain any atoms.
"""
def __init__(self,
atomgroups,
multi_group=False,
refgroup=None,
unwrap=False,
jitter=None,
concfreq=0):
if multi_group:
if type(atomgroups) not in (list, tuple):
atomgroups = [atomgroups]
# Check that all atomgroups are from same universe
if len(set([ag.universe for ag in atomgroups])) != 1:
raise ValueError("Atomgroups belong to different Universes")
# Sort the atomgroups,
# such that molecules are listed one after the other
self.atomgroups = atomgroups
for i, ag in enumerate(self.atomgroups):
if ag.n_atoms == 0:
raise ValueError(f"The {i+1}. provided `atomgroup`"
"does not contain any atoms.")
self.n_atomgroups = len(self.atomgroups)
self._universe = atomgroups[0].universe
self._allow_multiple_atomgroups = True
else:
self.atomgroup = atomgroups
if self.atomgroup.n_atoms == 0:
raise ValueError("The provided `atomgroup` does not contain "
"any atoms.")
self._universe = atomgroups.universe
self._allow_multiple_atomgroups = False
self._trajectory = self._universe.trajectory
self.refgroup = refgroup
self.unwrap = unwrap
self.jitter = jitter
self.concfreq = concfreq
if self.refgroup is not None and self.refgroup.n_atoms == 0:
raise ValueError("The provided `refgroup` does not contain "
"any atoms.")
super(AnalysisBase, self).__init__(trajectory=self._trajectory)
@property
def box_center(self):
"""Center of the simulation cell."""
return self._universe.dimensions[:3] / 2
[docs] def run(self, start=None, stop=None, step=None, verbose=None):
"""Iterate over the trajectory.
Parameters
----------
start : int
start frame of analysis
stop : int
stop frame of analysis
step : int
number of frames to skip between each analysed frame
verbose : bool
Turn on verbosity
"""
logger.info("Choosing frames to analyze")
# if verbose unchanged, use class default
verbose = getattr(self, '_verbose',
False) if verbose is None else verbose
self._setup_frames(self._trajectory, start, stop, step)
logger.info("Starting preparation")
if self.refgroup is not None:
if not hasattr(self.refgroup, 'masses') \
or np.sum(self.refgroup.masses) == 0:
logger.warning("No masses available in refgroup, falling back "
"to center of geometry")
ref_weights = np.ones_like(self.refgroup.atoms)
else:
ref_weights = self.refgroup.masses
compatible_types = [np.ndarray, float, int, list, np.float_, np.int_]
self._prepare()
# Log bin information if a spatial analysis is run.
if hasattr(self, "n_bins"):
logger.info(f"Using {self.n_bins} bins.")
module_has_save = callable(getattr(self.__class__, 'save', None))
timeseries = np.zeros(self.n_frames)
for i, ts in enumerate(ProgressBar(
self._trajectory[self.start:self.stop:self.step],
verbose=verbose)):
self._frame_index = i
self._index = self._frame_index + 1
self._ts = ts
self.frames[i] = ts.frame
self.times[i] = ts.time
# Before we do any coordinate transformation we first unwrap
# the system to avoid artifacts of later wrapping.
if self.unwrap:
self._universe.atoms.unwrap(
compound=get_compound(self._universe.atoms))
if self.refgroup is not None:
com_refgroup = center_cluster(self.refgroup, ref_weights)
t = self.box_center - com_refgroup
self._universe.atoms.translate(t)
self._universe.atoms.wrap(
compound=get_compound(self._universe.atoms))
if self.jitter:
ts.positions += np.random.random(
size=(len(ts.positions), 3)) * self.jitter
self._obs = Results()
timeseries[i] = self._single_frame()
# This try/except block is used because it will fail only once and
# is therefore not a performance issue like a if statement would be.
try:
for key in self._obs.keys():
if type(self._obs[key]) is list:
self._obs[key] = \
np.array(self._obs[key])
old_mean = self.means[key]
old_var = self.sems[key]**2 * (self._index - 1)
self.means[key] = \
new_mean(self.means[key],
self._obs[key], self._index)
self.sems[key] = \
np.sqrt(new_variance(old_var, old_mean,
self.means[key],
self._obs[key],
self._index) / self._index)
except AttributeError:
with logging_redirect_tqdm():
logger.info("Preparing error estimation.")
# the means and sems are not yet defined.
# We initialize the means with the data from the first frame
# and set the sems to zero (with the correct shape).
self.means = self._obs.copy()
self.sems = Results()
for key in self._obs.keys():
if type(self._obs[key]) not in compatible_types:
raise TypeError(
f"Obervable {key} has uncompatible type.")
self.sems[key] = \
np.zeros(np.shape(self._obs[key]))
if self.concfreq and self._index % self.concfreq == 0 \
and self._frame_index > 0:
self._conclude()
if module_has_save:
self.save()
logger.info("Finishing up")
self.corrtime = correlation_analysis(timeseries)
self._conclude()
if self.concfreq and module_has_save:
self.save()
return self
[docs] def savetxt(self, fname, X, columns=None):
"""Save to text.
An extension of the numpy savetxt function. Adds the command line
input to the header and checks for a doubled defined filesuffix.
Return a header for the text file to save the data to.
This method builds a generic header that can be used by any MAICoS
module. It is called by the save method of each module.
The information it collects is:
- timestamp of the analysis
- name of the module
- version of MAICoS that was used
- command line arguments that were used to run the module
- module call including the default arguments
- number of frames that were analyzed
- atomgroups that were analyzed
- output messages from modules and base classes (if they exist)
"""
# Get the required information first
current_time = datetime.now().strftime("%a, %b %d %Y at %H:%M:%S ")
module_name = self.__class__.__name__
# Here the specific output messages of the modules are collected.
# We only take into account maicos modules and start at the top of the
# module tree. Submodules without an own OUTPUT inherit from the parent
# class, so we want to remove those duplicates.
messages = []
for cls in self.__class__.mro()[-3::-1]:
if hasattr(cls, 'OUTPUT'):
if cls.OUTPUT not in messages:
messages.append(cls.OUTPUT)
messages = '\n'.join(messages)
# Get information on the analyzed atomgroup
atomgroups = ''
if self._allow_multiple_atomgroups:
for i, ag in enumerate(self.atomgroups):
atomgroups += f" ({i + 1}) {atomgroup_header(ag)}\n"
else:
atomgroups += f" (1) {atomgroup_header(self.atomgroup)}\n"
header = (
f"This file was generated by {module_name} on {current_time}\n\n"
f"{module_name} is part of MAICoS v{__version__}\n\n"
f"Command line:"
f" {get_cli_input()}\n"
f"Module input:"
f" {module_name}{inspect.signature(self.__init__)}"
f".run({inspect.signature(self.run)})\n\n"
f"Statistics over {self._index} frames\n\n"
f"Considered atomgroups:\n"
f"{atomgroups}\n"
f"{messages}\n\n"
)
if columns is not None:
header += '|'.join([f"{i:^26}"for i in columns])[2:]
fname = "{}{}".format(fname, (not fname.endswith('.dat')) * '.dat')
np.savetxt(fname, X, header=header, fmt='% .18e ', encoding='utf8')
[docs]@render_docs
class ProfileBase:
"""Base class for computing profiles.
Parameters
----------
${ATOMGROUPS_PARAMETER}
${PROFILE_CLASS_PARAMETERS}
${PROFILE_CLASS_PARAMETERS_PRIVATE}
Attributes
----------
${PROFILE_CLASS_ATTRIBUTES}
"""
def __init__(self,
atomgroups,
weighting_function,
normalization,
grouping,
bin_method,
output,
f_kwargs=None,):
self.atomgroups = atomgroups
self.normalization = normalization.lower()
self.grouping = grouping.lower()
self.bin_method = bin_method.lower()
self.output = output
if f_kwargs is None:
f_kwargs = {}
self.weighting_function = lambda ag: weighting_function(ag,
grouping,
**f_kwargs)
# We need to set the following dictionaries here because ProfileBase
# is not a subclass of AnalysisBase (only needed for tests)
self.results = Results()
self._obs = Results()
def _prepare(self):
normalizations = ["none", "volume", "number"]
if self.normalization not in normalizations:
raise ValueError(f"`{self.normalization}` not supported. "
f"Use {', '.join(normalizations)}.")
groupings = ["atoms", "segments", "residues", "molecules", "fragments"]
if self.grouping not in groupings:
raise ValueError(f"`{self.grouping}` is not a valid option for "
f"grouping. Use {', '.join(groupings)}.")
# If unwrap has not been set we define it here
if not hasattr(self, "unwrap"):
self.unwrap = True
if self.unwrap and self.grouping == "atoms":
logger.warning("Unwrapping in combination with atom grouping "
"is superfluous. `unwrap` will be set to `False`.")
self.unwrap = False
bin_methods = ["cog", "com", "coc"]
if self.bin_method not in bin_methods:
raise ValueError(f"`{self.bin_method}` is an unknown binning "
f"method. Use {', '.join(bin_methods)}.")
if self.normalization == 'number':
self.tot_bincount = np.zeros((self.n_bins, self.n_atomgroups))
def _compute_histogram(self, positions, weights=None):
"""Calculate histogram based on positions.
Parameters
----------
positions : numpy.ndarray
positions
weights : numpy.ndarray
weights for the histogram.
Returns
-------
hist : numpy.ndarray
histogram
"""
raise NotImplementedError("Only implemented in child classes")
def _single_frame(self):
self._obs.profile = np.zeros((self.n_bins, self.n_atomgroups))
for index, selection in enumerate(self.atomgroups):
if self.grouping == 'atoms':
positions = selection.atoms.positions
else:
kwargs = dict(compound=self.grouping)
if self.bin_method == "cog":
positions = selection.atoms.center_of_geometry(**kwargs)
elif self.bin_method == "com":
positions = selection.atoms.center_of_mass(**kwargs)
elif self.bin_method == "coc":
positions = selection.atoms.center_of_charge(**kwargs)
weights = self.weighting_function(selection)
profile = self._compute_histogram(positions, weights)
if self.normalization == 'number':
bincount = self._compute_histogram(positions, weights=None)
self.tot_bincount[:, index] += bincount
# If a bin does not contain any particles we divide by 0.
with np.errstate(invalid='ignore'):
profile /= bincount
profile = np.nan_to_num(profile)
elif self.normalization == "volume":
profile /= self._obs.bin_volume
self._obs.profile[:, index] = profile
def _conclude(self):
self.results.profile = self.means.profile
self.results.dprofile = self.sems.profile
if self.normalization == 'number':
no_occurences_idx = self.tot_bincount == 0
self.results.profile[no_occurences_idx] = np.nan
self.results.dprofile[no_occurences_idx] = np.nan
[docs] def save(self):
"""Save results of analysis to file."""
columns = ["positions [Å]"]
for i, _ in enumerate(self.atomgroups):
columns.append(f'({i + 1}) profile')
for i, _ in enumerate(self.atomgroups):
columns.append(f'({i + 1}) error')
# Required attribute to use method from `AnalysisBase`
self._allow_multiple_atomgroups = True
AnalysisBase.savetxt(self,
self.output,
np.hstack((self.results.bin_pos[:, np.newaxis],
self.results.profile,
self.results.dprofile)),
columns=columns)