Source code for luna.interaction.fp.shell

from itertools import chain
from collections import defaultdict
from enum import Enum
import numpy as np
import mmh3

from luna.version import __version__
from luna.util.exceptions import ShellCenterNotFound
from luna.util.default_values import CHEMICAL_FEATURE_IDS, INTERACTION_IDS
from luna.interaction.fp.fingerprint import DEFAULT_FP_LENGTH, Fingerprint, CountFingerprint
from luna.interaction.fp.type import IFPType
from luna.mol.groups import PseudoAtomGroup, AtomGroupNeighborhood
from luna.mol.features import ChemicalFeature


import logging

logger = logging.getLogger()


PAD_DEFAULT = -9


[docs]class CompoundClassIds(Enum): """An enumeration of compound classes.""" HETATM = 1 RESIDUE = 2 NUCLEOTIDE = 3 WATER = 4 UNKNOWN = 5
[docs]class ShellManager: """Store and manage :class:`Shell` objects. Parameters ---------- num_levels : int The maximum number of iterations for fingerprint generation. radius_step : float The multiplier used to increase shell size at each iteration. At iteration 0, shell radius is 0 * ``radius_step``, at iteration 1, radius is 1 * ``radius_step``, etc. fp_length : int The fingerprint length (total number of bits). ifp_type : :class:`~luna.interaction.fp.type.IFPType` The fingerprint type (EIFP, FIFP, or HIFP). shells : iterable of :class:`Shell`, optional An initial sequence of :class:`Shell` objects (fingerprint features). verbose : bool If True, warnings issued during the usage of this `ShellManager` will be displayed. The default value is False. Attributes ---------- num_levels : int The maximum number of iterations for fingerprint generation. radius_step : float The multiplier used to increase shell size at each iteration. fp_length : int The fingerprint length (total number of bits). ifp_type : :class:`~luna.interaction.fp.type.IFPType` The fingerprint type (EIFP, FIFP, or HIFP). shells : iterable of :class:`Shell` The sequence of shells (fingerprint features). verbose : bool The verbosity state. version : str The LUNA's version with which shells were generated. levels : dict of {int: list of `Shell`} Register shells by level, where keys are levels and values are lists of `Shell` objects. .. note:: Levels are 0-indexed. So, the first level is 0, second is 1, etc. That means if ``num_levels`` is 5, the last level will be 4. centers : dict of dict of {int: `Shell`} Register shells by center, where keys are :class:`~luna.mol.groups.AtomGroup` objects and values are dict that store all shells generated for that center at each iteration (level). """ def __init__(self, num_levels, radius_step, fp_length, ifp_type, shells=None, verbose=False): self.num_levels = num_levels self.radius_step = radius_step self.fp_length = fp_length self.ifp_type = ifp_type self.shells = shells or [] self.verbose = verbose self.version = __version__ self._init_controllers() @property def num_shells(self): """int, read-only: Total number of shells in ``shells``.""" return len(self.shells) @property def num_unique_shells(self): """int, read-only: Total number of unique shells in ``shells``.""" return len(self.unique_shells) @property def unique_shells(self): """iterable of `Shell`, read-only: Unique shells. Return the same as :meth:`get_valid_shells`.""" return self.get_valid_shells()
[docs] def find_similar_shell(self, shell): """Find a shell in ``shells`` similar to ``shell``. Two shells are similar if they represent the same substructural information. Parameters ---------- shell : `Shell` Returns ------- : `Shell` or None Return a similar shell or None if it does not find any. """ for added_shell in self.shells: # For each group of similar shells, it will exist only one valid shell. if added_shell.is_valid(): if shell.is_similar(added_shell): return added_shell return None
[docs] def add_shell(self, shell): """Add a new shell to ``shells``. Parameters ---------- shell : `Shell` """ # Any new shell without interactions from level 1 onwards will be automatically considered invalid. if shell.level > 0 and len(shell.interactions) == 0: shell.valid = False else: found_shell = self.find_similar_shell(shell) if found_shell: if found_shell.level < shell.level: shell.valid = False elif found_shell.level == shell.level and found_shell.identifier <= shell.identifier: shell.valid = False else: found_shell.valid = False self.shells.append(shell) self.levels[shell.level].append(shell) self.centers[shell.central_atm_grp][shell.level] = shell
[docs] def get_valid_shells(self): """Return only valid shells. A shell is considered invalid if, by the time it is added in ``shells``, there is another shell representing the same substructural information. That means this shell is not unique and does not contributes to any new information. On the other hand, if the shell contributes by adding new information to ``shells``, then it will be considered valid and unique. So, the first shell of a series of shells containing the same information is considered valid and the others invalid. Returns ------- : list of `Shell` """ return [s for s in self.shells if s.is_valid()]
[docs] def get_shells_by_identifier(self, identifier, unique_shells=False): """Get shells by identifier. Parameters ---------- identifier : int The shell identifier. unique_shells : bool If True, return only unique shells. Otherwise, return all shells having the identifier ``identifier`` (the default). Returns ------- : list of `Shell` """ if unique_shells: return [s for s in self.shells if s.identifier == identifier and s.is_valid()] else: return [s for s in self.shells if s.identifier == identifier]
[docs] def get_shells_by_level(self, level, unique_shells=False): """Get shells by level (iteration number). Parameters ---------- level : int unique_shells : bool If True, return only unique shells. Otherwise, return all shells at level ``level`` (the default). Returns ------- : list of `Shell` """ shells = [] if level in self.levels: if unique_shells: shells = [s for s in self.levels[level] if s.is_valid()] else: shells = self.levels[level] elif self.verbose: logger.warning("The informed level '%d' does not exist." % level) return shells
[docs] def get_shells_by_center(self, center, unique_shells=False): """Get shells by center (:class:`~luna.mol.groups.AtomGroup` object). Parameters ---------- center : :class:`~luna.mol.groups.AtomGroup` The center of a shell, which consists of an :class:`~luna.mol.groups.AtomGroup` object. unique_shells : bool If True, return only unique shells. Otherwise, return all shells generated for center ``center`` (the default). Returns ------- : dict of {int: `Shell`} All shells generated for center ``center`` at each iteration (key). """ shells = {} if center in self.centers: if unique_shells: shells = {i: s for i, s in self.centers[center].items() if s.is_valid()} else: shells = self.centers[center] elif self.verbose: logger.warning("The informed center '%s' does not exist." % center) return shells
[docs] def get_shell_by_center_and_level(self, center, level, unique_shells=False): """Get the shell generated for center ``center`` (:class:`~luna.mol.groups.AtomGroup` object) at level (iteration) ``level``. Parameters ---------- center : :class:`~luna.mol.groups.AtomGroup` The center of a shell, which consists of an :class:`~luna.mol.groups.AtomGroup` object. level : int The target level (iteration). unique_shells : bool If True, return the `Shell` object if it is unique and None otherwise. The default value is False. Returns ------- : `Shell` or None The shell generated for center ``center`` at level ``level``. If the `Shell` object is not unique, return None. """ shell = self.centers.get(center, {}).get(level) if shell is None and self.verbose: logger.warning("The informed center '%s' does not exist in the level '%d'." % (center, level)) if unique_shells: shell = shell if shell.is_valid() else None if shell is None and self.verbose: logger.warning("The shell found with center '%s' and level '%d' is not unique." % (center, level)) return shell
[docs] def get_previous_shell(self, center, curr_level, unique_shells=False): """Get the last shell having center ``center`` that was generated before level ``curr_level``. For instance, if the current level (iteration) is 5 and the last valid shell generated for center :math:`C` was at level 4, then :meth:`get_previous_shell` would return that shell at level 4. Parameters ---------- center : :class:`~luna.mol.groups.AtomGroup` The center of a shell, which consists of an :class:`~luna.mol.groups.AtomGroup` object. curr_level : int The current level (iteration). unique_shells : bool If True, ignore non-valid shells and go down to inferior levels until a valid shell is found. If level 0 was reached and no valid shell was found, then return None. The default value is False. Returns ------- : `Shell` or None The first previous valid shell or None if no valid shell was found. """ shell = None while curr_level != 0 and shell is None: level = curr_level - 1 shell = self.get_shell_by_center_and_level(center, level, unique_shells) curr_level = level if shell is None and self.verbose: logger.warning("No previous shell centered on '%s' departing from the level '%d' was found." % (center, level)) return shell
[docs] def get_last_shell(self, center, unique_shells=False): """Get the last shell generated for center ``center``. Parameters ---------- center : :class:`~luna.mol.groups.AtomGroup` The center of a shell, which consists of an :class:`~luna.mol.groups.AtomGroup` object. unique_shells : bool If True, ignore non-valid shells. That means shells generated at superior levels may be ignored if they are not valid. The default value is False. Returns ------- : `Shell` or None The last shell generated for center ``center`` or None if no valid shell was found. """ shell = None shells = self.get_shells_by_center(center, unique_shells) if shells: # Sort by key and get a shell based on the latest value. shell = shells[sorted(shells, key=int)[-1]] elif self.verbose: logger.warning("No shell centered on '%s' was found." % center) return shell
[docs] def get_identifiers(self, level=None, unique_shells=False): """Get all shells' identifier. Parameters ---------- level : int, optional If provided, only return identifiers of shells at level ``level``. unique_shells : bool If True, ignore identifiers of non-valid shells. The default value is False. Returns ------- : list of int """ if level is not None: if unique_shells: identifiers = [s.identifier for s in self.get_shells_by_level(level) if s.is_valid()] else: identifiers = [s.identifier for s in self.get_shells_by_level(level)] else: if unique_shells: identifiers = [s.identifier for s in self.shells if s.is_valid()] else: identifiers = [s.identifier for s in self.shells] return sorted(identifiers)
[docs] def to_fingerprint(self, fold_to_length=None, count_fp=False, unique_shells=False): """Encode shells into an interaction fingerprint. Parameters ---------- fold_to_length : int, optional If provided, fold the fingerprint to length ``fold_to_length``. count_fp : bool If True, create a count fingerprint (:class:`~luna.interaction.fp.fingerprint.CountFingerprint`). Otherwise, return a bit fingerprint (:class:`~luna.interaction.fp.fingerprint.Fingerprint`). unique_shells : bool If True, only unique shells are used to create the fingerprint. The default value is False. Returns ------- : :class:`~luna.interaction.fp.fingerprint.CountFingerprint` or :class:`~luna.interaction.fp.fingerprint.Fingerprint` """ indices = self.get_identifiers(unique_shells=unique_shells) props = {"num_levels": self.num_levels, "fp_length": self.fp_length, "radius_step": self.radius_step} if count_fp: fp = CountFingerprint(indices, fp_length=self.fp_length, props=props) else: fp = Fingerprint(indices, fp_length=self.fp_length, props=props) if fold_to_length: return fp.fold(fold_to_length) return fp
[docs] def trace_back_feature(self, feature_id, ifp, unique_shells=False): """Trace a feature from a fingerprint back to the shells that originated that feature. .. note:: Due to fingerprint folding, multiple substructures may end up encoded in the same bit, the so-called collision problem. So, if the provided feature contains collisions, shells representing different substructures may be returned by :meth:`trace_back_feature`. Parameters ---------- feature_id: int The target feature id. ifp : `Fingerprint` The fingerprint containing the feature ``feature_id``. unique_shells : bool If True, ignore identifiers of non-valid shells. The default value is False. Yields ------- : `Shell` Examples -------- In the below example, we will assume a LUNA project object named ``proj_obj`` already exists. Then, we will generate an EIFP fingerprint for the first :class:`~luna.mol.groups.AtomGroupsManager` object at ``proj_obj``. >>> from luna.interaction.fp.shell import ShellGenerator >>> from luna.interaction.fp.type import IFPType >>> atm_grps_mngr = list(proj_obj.atm_grps_mngrs)[0] >>> num_levels, radius_step = 2, 3 >>> sg = ShellGenerator(num_levels, radius_step, ifp_type=IFPType.EIFP) >>> sm = sg.create_shells(atm_grps_mngr) >>> fp = sm.to_fingerprint(fold_to_length=1024, count_fp=True) >>> print(fp.indices) [ 2 19 22 23 34 37 39 45 54 67 71 75 83 84 93 109 138 140 157 162 181 182 186 187 191 194 206 209 211 237 246 251 263 271 281 296 304 315 323 358 370 374 388 392 399 400 419 439 476 481 487 509 519 527 532 578 587 592 604 605 629 635 645 661 668 698 711 713 732 736 740 753 764 795 813 815 820 824 825 831 836 855 873 882 911 926 967 975 976 984 990 996 1020] Now, we can trace features back to original identifiers and investigate its substructural information. >>> ori_indices = list(sm.trace_back_feature(34, fp, unique_shells=True)) >>> print(ori_indices) [(494318626, [<Shell: level=0, radius=0.000000, center=<AtomGroup: [<ExtendedAtom: 3QQK/0/A/GLN/85/CD>, <ExtendedAtom: 3QQK/0/A/GLN/85/NE2>, <ExtendedAtom: 3QQK/0/A/GLN/85/OE1>]>, interactions=0>])] """ for ori_feature in ifp.unfolding_map[feature_id]: yield (ori_feature, self.get_shells_by_identifier(ori_feature, unique_shells))
def _init_controllers(self): levels = defaultdict(list) centers = defaultdict(lambda: defaultdict(lambda: None)) for shell in self.shells: levels[shell.level].append(shell) centers[shell.central_atm_grp][shell.level] = shell self.levels = levels self.centers = centers
[docs]class Shell: """ A container to store substructural information, which is the base for LUNA fingerprints. Shells are centered on an atom or atom group (:class:`~luna.mol.groups.AtomGroup` objects) and represent all atoms and interactions explicitly within it. Parameters ---------- central_atm_grp : :class:`~luna.mol.groups.AtomGroup` The shell center. level : int The level (iteration) at which the shell was generated. radius : float The shell radius. neighborhood : iterable of :class:`~luna.mol.groups.AtomGroup` All atoms and atom groups within a shell of radius ``radius`` centered on ``central_atm_grp``. inter_tuples : iterable of (:class:`~luna.interaction.type.InteractionType`, :class:`~luna.mol.groups.AtomGroup`) All interactions within a shell of radius ``radius`` centered on ``central_atm_grp``. Each tuple contains an :class:`~luna.interaction.type.InteractionType` object and one of the :class:`~luna.mol.groups.AtomGroup` objects participating to the interaction. .. note:: As an interaction involves two participants, it would be expected that each interaction produces two tuples. However, by default, ShellGenerator sorts atom groups and considers only the first tuple that appears, \ which guarantees that only one of the possible tuples is added to avoid information duplication. diff_comp_classes : bool If True (the default), include differentiation between compound classes. That means structural information originated from :class:`~luna.mol.groups.AtomGroup` objects belonging to residues, nucleotides, ligands, or water molecules will be considered different even if their structural information are the same. This is useful for example to differentiate protein-ligand interactions from residue-residue ones. dtype : data-type Use arrays of type ``dtype`` to store information. The default value is np.int64. seed : int A seed to generate shell identifiers through the MurmurHash3 hash function. The default value is 0. manager : `ShellManager` The `ShellManager` object that stores and controls this `Shell` object. valid : bool If the shell is valid or not. By default, all shells are considered valid. feature_mapper : dict, optional A dict that maps atoms and interactions to unique values. If not provided, ``feature_mapper`` will inherit from the default mappings ``CHEMICAL_FEATURE_IDS`` and ``INTERACTION_IDS``. Attributes ---------- central_atm_grp : :class:`~luna.mol.groups.AtomGroup` level : int radius : float diff_comp_classes : bool dtype : data-type seed : int valid : bool feature_mapper : dict """ def __init__(self, central_atm_grp, level, radius, neighborhood=None, inter_tuples=None, diff_comp_classes=True, dtype=np.int64, seed=0, manager=None, valid=True, feature_mapper=None): self.central_atm_grp = central_atm_grp self.level = level self.radius = radius self.valid = valid self.diff_comp_classes = diff_comp_classes if not neighborhood: # If inter_tuples was not defined, initialize the neighborhood as an empty list. # Otherwise, define the neighborhood as the atom groups interacting with the central atom group. neighborhood = [] if not inter_tuples else [x[1] for x in self._inter_tuples] self._neighborhood = set(neighborhood) # Always add the central atom in its own neighborhood. self._neighborhood.add(central_atm_grp) inter_tuples = inter_tuples or [] self._inter_tuples = set(inter_tuples) self._interactions = set([x[0] for x in self._inter_tuples]) # TODO: FIX IT. if not feature_mapper: default_dict = {**CHEMICAL_FEATURE_IDS, **INTERACTION_IDS} # TODO: Built a class for managing the feature maps. # TODO: Attribute a unique id for each new feature. # feature_mapper = defaultdict(lambda: -1, default_dict) feature_mapper = default_dict self.feature_mapper = feature_mapper self.dtype = dtype self.seed = seed self._manager = manager # Encode the interactions. It represents the substructures comprised by this shell. self._encoded_data = self._encode_interactions() # Defined in the hash function self._identifier = self.hash_shell() @property def neighborhood(self): """iterable of :class:`~luna.mol.groups.AtomGroup`, read-only: All atoms and atom groups within this shell.""" return self._neighborhood @property def interactions(self): """iterable of :class:`~luna.interaction.type.InteractionType`, read-only: All interactions within this shell.""" return self._interactions @property def inter_tuples(self): """iterable of tuple, read-only: Each tuple contains an :class:`~luna.interaction.type.InteractionType` object \ and one of the :class:`~luna.mol.groups.AtomGroup` objects participating to the interaction.""" return self._inter_tuples @property def manager(self): """`ShellManager`, read-only: The `ShellManager` object that stores and controls this `Shell` object.""" return self._manager @property def identifier(self): """int, read-only: This shell identifier, which is generated by hashing its encoded data with a hash function. By default, LUNA uses MurmurHash3 as the hash function.""" return self._identifier @property def encoded_data(self): """iterable of tuple, read-only: The data encoded in this shell.""" return self._encoded_data @property def previous_shell(self): """`Shell`, read-only: The previous shell, i.e., a shell centered on the same central \ :class:`~luna.mol.groups.AtomGroup` object from a previous level. \ For example, if this shell is in level 5, return a shell from level 4 having the same center.""" shell = self._manager.get_previous_shell(self.central_atm_grp, self.level) if shell is None: logger.exception("No previous shell centered in '%s' was found." % self.central_atm_grp) raise ShellCenterNotFound("No previous shell centered in '%s' was found." % self.central_atm_grp) return shell
[docs] def is_valid(self): """If the shell is valid or not. Returns ------- : bool """ return self.valid
[docs] def is_similar(self, shell): """ If this shell is similar to ``shell``. Two shells are similar if they represent the same substructural information. Parameters ---------- shell : `Shell` Returns ------- : bool """ # The shells' identifiers will be equal when the shells encode the same information and were constructed in the same level. if self.level == shell.level: if self.identifier == shell.identifier: return True return False # If none of the shells have interactions, check if the shells have equal identifiers. if not self.interactions and not shell.interactions: # If the identifiers are different, but the shells' central group is the same, it means the shells encode the same # information even if their levels are different. # # OBS: this test is only for fast identification without doing a recursive procedure as the one applied in the ELSE statement. if self.central_atm_grp == shell.central_atm_grp: return True # Although two shells contain different identifiers and their centroids are not the same, they can still be equal if they # were obtained in different levels. else: if self.level > shell.level: return self.previous_shell.is_similar(shell) return self.is_similar(shell.previous_shell) # If both shells have interactions, check if the interactions are equal. elif self.interactions and shell.interactions: if self.encoded_data == shell.encoded_data: return True return False
[docs] def hash_shell(self): """Hash this shells' substructural information into a 32-bit integer using MurmurHash3. Returns ------- : int A 32-bit integer representing this shell's substructural information. """ if self.level == 0: # Get the initial data for the first shell according to the IFP type the user has chosen. data = self._initial_shell_data() else: cent_prev_id = self.previous_shell.identifier # Initialization of a new feature vector data = [(self.level, cent_prev_id)] inter_tuples = [] for (inter, nb_atm_grp) in self._inter_tuples: if isinstance(nb_atm_grp, PseudoAtomGroup): # Pseudo-groups that represents only the atoms involved in a specific interaction don't generate shells. # Only their parents do, i.e., the whole group generates shells. So, use it instead. nb_atm_grp = nb_atm_grp.parent_grp prev_nb_shell = self._manager.get_previous_shell(nb_atm_grp, self.level) if prev_nb_shell is None: logger.exception("No previous shell centered in %s was found." % nb_atm_grp) raise ShellCenterNotFound("No previous shell centered in %s was found." % nb_atm_grp) # 1st elem: interaction type. # 2nd elem: previous identifier of the neighbor atom group; inter_tuples.append((self.feature_mapper[inter.type], prev_nb_shell.identifier)) # Sort the tuples to avoid dependence on the order in which tuples are added. sorted_list = sorted(inter_tuples) # Join the interaction information to the feature vector. data += sorted_list np_array = np.array(data, self.dtype) # TODO: Let the user define a hash function hashed_shell = mmh3.hash(np_array, self.seed, signed=False) return hashed_shell
def _initial_shell_data(self): data = [] # EIFP uses atomic invariants. if self.manager.ifp_type == IFPType.EIFP: # Shells use atomic invariants as data. In case of atom groups, the data consists of a list of invariants. data = sorted([atm.invariants for atm in self.central_atm_grp.atoms]) # HIFP uses atomic invariants for atoms and pharmacophore information for atom groups. elif self.manager.ifp_type == IFPType.HIFP: if len(self.central_atm_grp.atoms) == 1: # Shells whose centroid are atoms use invariants as data. data = sorted([atm.invariants for atm in self.central_atm_grp.atoms]) else: # Shells whose centroid are atoms' group use pharmacophore as data. data = [[self.feature_mapper[cf.format_name()] for cf in self.central_atm_grp.features]] # FIFP uses pharmacophore properties for atoms and atoms' group. elif self.manager.ifp_type == IFPType.FIFP: atm_grp_data = [self.feature_mapper[cf.format_name()] for cf in self.central_atm_grp.features] if len(self.central_atm_grp.atoms) == 1: features = set() # It will loop only through one atom and, therefore, it can be removed without # losing information. However, if someday I decide to remove the If, then the code for capturing # all atom derived features will be already working. for atm in self.central_atm_grp.atoms: for atm_grp in atm.atm_grps: if atm_grp != self.central_atm_grp: features.update(atm_grp.features) atm_grp_data += [self.feature_mapper[cf.format_name()] for cf in features] data = [sorted(atm_grp_data)] # # # Include differentiation between compound classes, i.e., groups belonging to Residues, Nucleotides, Ligands, or Waters # are treated as being different even when the group would be considered the same. if self.diff_comp_classes: # `classes` is a list because an atom group can be formed by multiple compounds, which can be from different # (e.g., residue and ligand bound covalently) or same (e.g., amide groups formed by backbone residues) classes. classes = sorted([CompoundClassIds[c.get_class().upper()].value for c in self.central_atm_grp.compounds]) np_data = np.array(data, self.dtype) np_classes = np.array(classes, self.dtype) # Add padding to np_classes if np_data.shape[1] > np_classes.shape[0]: np_classes = np.pad(np_classes, (0, (np_data.shape[1] - np_classes.shape[0])), 'constant', constant_values=PAD_DEFAULT) # Add padding to np_data. elif np_data.shape[1] < np_classes.shape[0]: np_data = np.pad(np_data, ((0, 0), (0, np_classes.shape[0] - np_data.shape[1])), 'constant', constant_values=PAD_DEFAULT) data = np.vstack((np_data, np_classes)) return data def _encode_interactions(self): encoded_data = [] for inter in self.interactions: init_src_shell = self._manager.get_previous_shell(inter.src_grp, 1) init_trgt_shell = self._manager.get_previous_shell(inter.trgt_grp, 1) shell_ids = tuple(sorted([init_src_shell.identifier, init_trgt_shell.identifier])) encoded_data.append((shell_ids, self.feature_mapper[inter.type])) return sorted(encoded_data) def __repr__(self): return ("<Shell: level=%d, radius=%f, center=%s, interactions=%d>" % (self.level, self.radius, self.central_atm_grp, len(self.interactions)))
[docs]class ShellGenerator: """Generate shells, the base information of LUNA fingerprints. Parameters ---------- num_levels : int The maximum number of iterations for fingerprint generation. radius_step : float The multiplier used to increase shell size at each iteration. At iteration 0, shell radius is 0 * ``radius_step``, at iteration 1, radius is 1 * ``radius_step``, etc. fp_length : int The fingerprint length (total number of bits). The default value is :math:`2^{32}`. ifp_type : :class:`~luna.interaction.fp.type.IFPType` The fingerprint type (EIFP, FIFP, or HIFP). The default value is EIFP. diff_comp_classes : bool If True (the default), include differentiation between compound classes. That means structural information originated from :class:`~luna.mol.groups.AtomGroup` objects belonging to residues, nucleotides, ligands, or water molecules will be considered different even if their structural information are the same. This is useful for example to differentiate protein-ligand interactions from residue-residue ones. dtype : data-type Use arrays of type ``dtype`` to store information. The default value is np.int64. seed : int A seed to generate shell identifiers through the MurmurHash3 hash function. The default value is 0. bucket_size : int Bucket size of KD tree. You can play around with this to optimize speed if you feel like it. The default value is 10. Attributes ---------- num_levels : int radius_step : float fp_length : int ifp_type : :class:`~luna.interaction.fp.type.IFPType` diff_comp_classes : bool dtype : data-type seed : int bucket_size : int Examples -------- In the below example, we will assume a LUNA project object named ``proj_obj`` already exists. First, let's define a `ShellGenerator` object that will create shells over 2 iterations (levels). At each iteration, the shell radius will be increased by 3 and substructural information will be encoded following EIFP definition. >>> from luna.interaction.fp.shell import ShellGenerator >>> from luna.interaction.fp.type import IFPType >>> num_levels, radius_step = 2, 3 >>> sg = ShellGenerator(num_levels, radius_step, ifp_type=IFPType.EIFP) After defining the generator, we can create shells by calling :meth:`create_shells`, which expects an :class:`~luna.mol.groups.AtomGroupsManager` object. In this example, we will the first :class:`~luna.mol.groups.AtomGroupsManager` object from an existing LUNA project (``proj_obj``). >>> atm_grps_mngr = list(proj_obj.atm_grps_mngrs)[0] >>> sm = sg.create_shells(atm_grps_mngr) >>> print(sm.num_shells) 528 Now, with shells stored in the `ShellManager` object you can, for instance: * Generate fingerprints: >>> fp = sm.to_fingerprint(fold_to_length=1024) >>> print(fp.indices) [ 2 19 22 23 34 37 39 45 54 67 71 75 83 84 93 109 138 140 157 162 181 182 186 187 191 194 206 209 211 237 246 251 263 271 281 296 304 315 323 358 370 374 388 392 399 400 419 439 476 481 487 509 519 527 532 578 587 592 604 605 629 635 645 661 668 698 711 713 732 736 740 753 764 795 813 815 820 824 825 831 836 855 873 882 911 926 967 975 976 984 990 996 1020] * Visualize substructural information in Pymol\: >>> from luna.interaction.fp.view import ShellViewer >>> shell_tuples = [(atm_grps_mngr.entry, sm.unique_shells, proj_obj.pdb_path)] >>> sv = ShellViewer() >>> sv.new_session(shell_tuples, "example.pse") """ def __init__(self, num_levels, radius_step, fp_length=DEFAULT_FP_LENGTH, ifp_type=IFPType.EIFP, diff_comp_classes=True, dtype=np.int64, seed=0, bucket_size=10): self.num_levels = num_levels self.radius_step = radius_step self.fp_length = fp_length self.ifp_type = ifp_type self.diff_comp_classes = diff_comp_classes self.seed = seed self.dtype = dtype self.bucket_size = bucket_size
[docs] def create_shells(self, atm_grps_mngr): """Perceive substructural information from :class:`~luna.mol.groups.AtomGroup` objects and their interactions, and represent such information as shells. Parameters ---------- atm_grps_mngr : :class:`~luna.mol.groups.AtomGroupsManager` Container of :class:`~luna.mol.groups.AtomGroup` objects and their interactions. Returns ------- : `ShellManager` Raises ------ ShellCenterNotFound If it fails to recover a shell having a given center. """ sm = ShellManager(self.num_levels, self.radius_step, self.fp_length, self.ifp_type) all_interactions = atm_grps_mngr.get_all_interactions() neighborhood = set(atm_grps_mngr.atm_grps) # Create pseudo-groups for hydrophobic interactions. pseudo_grps = set(self._generate_pseudo_grps(atm_grps_mngr)) # Create a neighborhood scope for the searches search_scope = neighborhood | pseudo_grps nbs = AtomGroupNeighborhood(search_scope, self.bucket_size) # Map an atom to all of its pseudo-groups. pseudo_grps_mapping = defaultdict(set) for pseudo_grp in pseudo_grps: atoms = tuple(sorted(pseudo_grp.atoms)) for atm in atoms: # Add an atom to the mapping pseudo_grps_mapping[(atm,)].add(pseudo_grp) # Add a list of atoms pseudo_grps_mapping[atoms].add(pseudo_grp) # It controls which atom groups already converged. skip_atm_grps = set() # Sort the atom groups to avoid order dependence. sorted_neighborhood = sorted(neighborhood) level = -1 for level in range(self.num_levels): radius = self.radius_step * level for atm_grp in sorted_neighborhood: # Ignore centroids that already reached the limit of possible substructures. if atm_grp in skip_atm_grps: continue # It stores all possible expansions each group can do. Each expansion is a derived group. # Initially, the list contains only the groups derived from the central atom group. # But, later it may also contain derived groups from interacting partner groups. all_derived_atm_grps = self._get_derived_grps(atm_grp, pseudo_grps_mapping) # In level 0, the number of unique derived groups is 0 as the shell initially only contains # information of the centroid. unique_derived_atm_grps = [] shell = None if radius > 0: prev_shell = sm.get_previous_shell(atm_grp, level) if not prev_shell: logger.exception("No previous shell centered in %s was found." % atm_grp) raise ShellCenterNotFound("There are no shells initialized to the atom group '%s'." % atm_grp) prev_atm_grps = prev_shell.neighborhood prev_interactions = prev_shell.interactions nb_atm_grps = set(nbs.search(atm_grp.centroid, radius)) inter_tuples = set() interactions_to_add = set() # For each atom group from the previous shell. for prev_atm_grp in sorted(prev_atm_grps): # Include the partner's derived groups to the set of all derived groups. all_derived_atm_grps.update(self._get_derived_grps(prev_atm_grp, pseudo_grps_mapping)) for inter in prev_atm_grp.interactions: # Only PseudoAtomGroup objects should deal with hydrophobic interactions. if isinstance(prev_atm_grp, PseudoAtomGroup) is False and inter.type == "Hydrophobic": continue partner_grp = self._recover_partner_grp(prev_atm_grp, inter, pseudo_grps_mapping) if partner_grp is not None: if inter not in interactions_to_add and partner_grp in nb_atm_grps: new_tuple = (inter, partner_grp) # Ignore interactions that already exists in the previous shell to avoid duplications # in the list of interactions. For example, without this control, an interaction I # between atom A1 and A2 would appear twice in the list: (I, A1) and (I, A2). # Thus, it keeps only the first interaction that appears while increasing the shell. if inter in prev_interactions and new_tuple not in prev_shell.inter_tuples: continue inter_tuples.add(new_tuple) interactions_to_add.add(inter) # Get valid derived groups, which are those ones inside the current sphere. valid_derived_atm_grps = set([ag for ag in all_derived_atm_grps if ag in nb_atm_grps]) unique_derived_atm_grps = valid_derived_atm_grps - prev_atm_grps # It adds a new shell only if there are new interactions and derived atom groups inside the shell. shell_nb = set([x[1] for x in inter_tuples]) | valid_derived_atm_grps shell_nb.add(atm_grp) shell = Shell(atm_grp, level, radius, neighborhood=shell_nb, inter_tuples=inter_tuples, manager=sm, diff_comp_classes=self.diff_comp_classes, seed=self.seed, dtype=self.dtype) else: shell = Shell(atm_grp, level, radius, manager=sm, diff_comp_classes=self.diff_comp_classes, seed=self.seed, dtype=self.dtype) if shell: sm.add_shell(shell) last_shell = shell else: last_shell = sm.get_last_shell(atm_grp) # Evaluate if the limit of possible substructures for the current centroid (atom group) was reached. if last_shell: # The limit will be reached when the last shell already contains all interactions # established by the atom groups inside the shell. In this case, expanding the radius # will not result in any new shell because a shell is only created when the atoms inside # the last shell establish interactions with the atom groups found after increasing the radius. all_nb_interactions = set(chain.from_iterable([g.interactions for g in last_shell.neighborhood])) # It considers only interactions whose atom groups exist in the neighborhood. valid_interactions = set([i for i in all_nb_interactions if i.src_grp in neighborhood and i.trgt_grp in neighborhood]) # It identifies if the convergence for the interactions was reached. interactions_converged = valid_interactions == last_shell.interactions # It identifies if the convergence for the expansion of atom groups was reached, which happens when all derived groups # were already included in the centroid neighborhood. However, it may happen that this requirement was fulfilled # by the time new unique derived groups were included in the centroid neighborhood. Thus, the second test provides # a chance to all of these recently discovered groups to expand. grp_expansions_converged = all_derived_atm_grps == last_shell.neighborhood and len(unique_derived_atm_grps) == 0 # The local convergence is reached when no atom group inside the current sphere can expand or all of its interactions # were already included to the sphere. local_convergence = interactions_converged and grp_expansions_converged # The global convergence occurs when all interactions in the binding site were already included in the sphere. global_convergence = all_interactions == last_shell.interactions # If the limit was reached for this centroid, in the next level it can be ignored. if local_convergence or global_convergence: skip_atm_grps.add(atm_grp) # If all atom groups reached the limit of possible substructures, just leave the loop. if len(skip_atm_grps) == len(neighborhood): logger.debug("The list of shells cannot be expanded anymore. The maximum number " "of substructures were reached.") break logger.debug("Shells creation finished.") logger.debug("The last level executed was: %d." % level) logger.debug("The number of levels defined was: %d." % self.num_levels) logger.debug("Total number of shells created: %d" % sm.num_shells) logger.debug("Total number of unique shells created: %d" % sm.num_unique_shells) return sm
def _generate_pseudo_grps(self, atm_grps_mngr): pseudo_grps = {} mapped_interactions = set() for hydrop_grp in atm_grps_mngr.filter_by_types(["Hydrophobe"]): for inter in hydrop_grp.interactions: # Ignore non-hydrophobic interactions or interactions already mapped. if inter.type != "Hydrophobic" or inter in mapped_interactions: continue src_tuple = (inter.src_grp, tuple(sorted(inter.src_interacting_atms))) trgt_tuple = (inter.trgt_grp, tuple(sorted(inter.trgt_interacting_atms))) for atm_grp, atms in [src_tuple, trgt_tuple]: # It will get a pseudo-group already created or create a new one. pseudo_grp = pseudo_grps.get(atms, PseudoAtomGroup(atm_grp, atms, [ChemicalFeature("Hydrophobe")])) pseudo_grp.add_interactions([inter]) pseudo_grps[atms] = pseudo_grp mapped_interactions.add(inter) return pseudo_grps.values() def _get_derived_grps(self, atm_grp, pseudo_grps_mapping): # Get derived groups for the informed atom group. derived_atm_grps = set([ag for a in atm_grp.atoms for ag in a.atm_grps]) # Get derived pseudo-groups for the informed atom group. derived_atm_grps.update(set([pseudo_grp for a in atm_grp.atoms for pseudo_grp in pseudo_grps_mapping.get((a,), [])])) return derived_atm_grps def _recover_partner_grp(self, atm_grp, interaction, pseudo_grps_mapping): if isinstance(atm_grp, PseudoAtomGroup): atoms = sorted(atm_grp.atoms) src_atms = sorted(interaction.src_interacting_atms) trgt_atms = sorted(interaction.trgt_interacting_atms) if atoms == src_atms: partner_atms = trgt_atms elif atoms == trgt_atms: partner_atms = src_atms else: return None partner_grp = None # Recover the partner groups based on the interacting atoms. Then, it will check which returned # pseudo-group corresponds to the interacting atoms, i.e., the one that contains exactly the same atoms. # This verification is mainly necessary for pseudo-groups composed only by one atom as such groups tend # to belong to more than one pseudo-group. for pseudo_grp in pseudo_grps_mapping.get(tuple(partner_atms), []): if sorted(pseudo_grp.atoms) == partner_atms: partner_grp = pseudo_grp break return partner_grp else: return interaction.get_partner(atm_grp)