Skip to content

Commit

Permalink
refactor: move to separate class
Browse files Browse the repository at this point in the history
  • Loading branch information
cbouy committed Dec 26, 2024
1 parent 908c442 commit 4e34243
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 136 deletions.
142 changes: 6 additions & 136 deletions prolif/fingerprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,144 +729,14 @@ def _run_bridged_analysis(self, traj, lig, prot, water, order=1, **kwargs):
order: int
Treshold for water bridge order
""" # noqa: E501
kwargs.pop("n_jobs", None)
residues = kwargs.pop("residues", None)
fp = Fingerprint(
interactions=["HBDonor", "HBAcceptor"],
parameters=self.parameters,
count=self.count,
)

# Run analysis for ligand-water interactions
ifp_stores: list[dict[int, IFP]] = [
fp._run_serial(traj, lig, water, residues=None, **kwargs),
]
# circular import
from prolif.interactions.water_bridge import WaterBridge

# Run water-water interaction analysis if order is 2 or above
if order >= 2:
ifp_stores.append(fp._run_serial(traj, water, water, residues=None, **kwargs),)

# Run water-protein analysis
ifp_stores.append(fp._run_serial(traj, water, prot, residues=residues, **kwargs),)

# merge results from the 2 runs on matching water residues
self.ifp = getattr(self, "ifp", {})
for (frame, ifp1), ifp2 in zip(ifp_stores[0].items(), ifp_stores[-1].values()):
# for each ligand-water interaction in ifp1
for data1 in ifp1.interactions():
# for each water-protein interaction in ifp2 where water1 == water2
for data2 in [
d2 for d2 in ifp2.interactions() if d2.ligand == data1.protein
]:
# construct merged metadata
metadata = (
{
"indices": {
"ligand": data1.metadata["indices"]["ligand"],
"protein": data2.metadata["indices"]["protein"],
"water": tuple(
set().union(
data1.metadata["indices"]["protein"],
data2.metadata["indices"]["ligand"],
)
),
},
"parent_indices": {
"ligand": data1.metadata["parent_indices"]["ligand"],
"protein": data2.metadata["parent_indices"]["protein"],
"water": tuple(
set().union(
data1.metadata["parent_indices"]["protein"],
data2.metadata["parent_indices"]["ligand"],
)
),
},
"water_residue": data1.protein,
"water_bridge_order": 1,
"ligand_role": data1.interaction,
"protein_role": ( # invert role
"HBDonor"
if data2.interaction == "HBAcceptor"
else "HBAcceptor"
),
**{
f"{key}{suffix}": data.metadata[key]
for suffix, data in [
("_ligand_water", data1),
("_water_protein", data2),
]
for key in ["distance", "DHA_angle"]
},
},
)

# store metadata
ifp = self.ifp.setdefault(frame, IFP())
ifp.setdefault((data1.ligand, data2.protein), {}).setdefault(
"WaterBridge", []
).append(metadata)

# Run water 2nd Order if order 2 or more
if order >= 2:
print("Matching ligand-water → water-water → water-protein:")
for frame in ifp_stores[0].keys():
ifp1 = ifp_stores[0][frame] # Ligand → Water
ifp2 = ifp_stores[1][frame] # Water → Water
ifp3 = ifp_stores[2][frame] # Water → Protein

# Chain ligand-water → water-water → water-protein
for data1 in ifp1.interactions():
for data2 in [d2 for d2 in ifp2.interactions() if d2.ligand == data1.protein]:
for data3 in [d3 for d3 in ifp3.interactions() if d3.ligand == data2.protein]:
metadata = {
"indices": {
"ligand": data1.metadata["indices"]["ligand"],
"protein": data3.metadata["indices"]["protein"],
"water_1": tuple(set().union(
data1.metadata["indices"]["protein"],
data2.metadata["indices"]["ligand"],
)),
"water_2": tuple(set().union(
data2.metadata["indices"]["protein"],
data3.metadata["indices"]["ligand"],
)),
},
"parent_indices": {
"ligand": data1.metadata["parent_indices"]["ligand"],
"protein": data3.metadata["parent_indices"]["protein"],
"water": tuple(set().union(
data1.metadata["parent_indices"]["protein"],
data2.metadata["parent_indices"]["ligand"],
)),
"water": tuple(set().union(
data2.metadata["parent_indices"]["protein"],
data3.metadata["parent_indices"]["ligand"],
)),
},
"water_residue_1": data1.protein, # Initial water residue
"water_residue_2": data2.protein, # Initial water residue
"ligand_role": data1.interaction,
"protein_role": ("HBDonor" if data3.interaction == "HBAcceptor" else "HBAcceptor"),
"water_bridge_order": 2, # Set order to 2 for extended matching
**{
f"{key}_ligand_water": data1.metadata[key]
for key in ["distance", "DHA_angle"]
},
**{
f"{key}_water_water": data2.metadata[key]
for key in ["distance", "DHA_angle"]
},
**{
f"{key}_water_protein": data3.metadata[key]
for key in ["distance", "DHA_angle"]
},
}

# Store metadata
ifp = self.ifp.setdefault(frame, IFP())
ifp.setdefault((data1.ligand, data3.protein), {}).setdefault(
"WaterBridge", []
).append(metadata)
water_bridge = WaterBridge(
parameters=self.parameters, count=self.count, ifp_store=self.ifp, **kwargs
)
water_bridge.run(traj, lig, prot, water, order)
return self

def to_dataframe(
Expand Down
210 changes: 210 additions & 0 deletions prolif/interactions/water_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""Module for the WaterBridge interaction implementation."""

from collections import defaultdict
from typing import Any, Callable, Iterator

from prolif.fingerprint import Fingerprint
from prolif.ifp import IFP, InteractionData


class WaterBridge:
"""Implementation of the WaterBridge analysis for trajectories.
Parameters
----------
parameters : dict
Parameters for the HBDonor and HBAcceptor interactions passed to the underlying
fingerprint generator. See
:class:`prolif.fingerprint.Fingerprint` for more details.
count : bool
Whether to generate a count fingerprint or just a binary one.
ifp_store : dict
Container for the results.
kwargs : Any
Additional arguments passed at runtime to the fingerprint generator's ``run``
method.
"""

def __init__(
self,
parameters: dict | None = None,
count: bool = False,
ifp_store: dict[int, IFP] | None = None,
**kwargs: Any,
):
kwargs.pop("n_jobs", None)
self.residues = kwargs.pop("residues", None)
self.water_fp = Fingerprint(
interactions=["HBDonor", "HBAcceptor"],
parameters=parameters,
count=count,
)
self.kwargs = kwargs
self.ifp = {} if ifp_store is None else ifp_store

def run(self, traj, lig, prot, water, order=1) -> dict[int, IFP]:
"""Run the water bridge analysis.
Parameters
----------
traj : MDAnalysis.coordinates.base.ProtoReader or MDAnalysis.coordinates.base.FrameIteratorSliced
Iterate over this Universe trajectory or sliced trajectory object
to extract the frames used for the fingerprint extraction
lig : MDAnalysis.core.groups.AtomGroup
An MDAnalysis AtomGroup for the ligand
prot : MDAnalysis.core.groups.AtomGroup
An MDAnalysis AtomGroup for the protein (with multiple residues)
water: MDAnalysis.core.groups.AtomGroup
An MDAnalysis AtomGroup for the water molecules
order: int
Treshold for water bridge order
""" # noqa: E501
# Run analysis for ligand-water and water-protein interactions
lig_water_ifp: dict[int, IFP] = self.water_fp._run_serial(
traj, lig, water, residues=None, **self.kwargs
)
water_prot_ifp: dict[int, IFP] = self.water_fp._run_serial(
traj, water, prot, residues=self.residues, **self.kwargs
)

# Run water-water interaction analysis if order is 2 or above
if order >= 2:
water_ifp: dict[int, IFP] | None = self.water_fp._run_serial(
traj, water, water, residues=None, **self.kwargs
)
else:
water_ifp = None

# first order
self._merge_metadata(lig_water_ifp, None, water_prot_ifp)

# add metadata for higher order
if water_ifp:
self._merge_metadata(lig_water_ifp, water_ifp, water_prot_ifp)

return self.ifp

def _merge_metadata(
self,
lig_water_ifp: dict[int, IFP],
water_ifp: dict[int, IFP] | None,
water_prot_ifp: dict[int, IFP],
) -> None:
"""Merge results from all fingerprints on matching water residues"""
for frame in lig_water_ifp:
ifp_lw = lig_water_ifp[frame] # Ligand → Water
ifp_wp = water_prot_ifp[frame] # Water → Protein
ifp_ww = water_ifp[frame] if water_ifp else None # Water → Water

# for each ligand-water1 interaction
for data_lw in ifp_lw.interactions():
# for each water1-water2 interaction (optional)
for data_ww in self._filter_interactions(
ifp_ww,
# match with ligand-water1 and exclude interaction with itself
lambda x: x.ligand == data_lw.protein and x.ligand != x.protein,
# if water_ifp is None, simply yields data_lw
default=data_lw,
):
# for each water2-protein interaction where water1 == water2
for data_wp in self._filter_interactions(
ifp_wp, lambda x: x.ligand == data_ww.protein
):
# get indices for individual water molecules
water_indices = defaultdict(set)
for data, role in [
(data_lw, "protein"),
(data_wp, "ligand"),
*(
((data_ww, "ligand"), (data_ww, "protein"))
if ifp_ww
else ()
),
]:
resid = getattr(data, role)
water_indices[f"water_{resid}"].update(
data.metadata["indices"][role]
)
# construct merged metadata
metadata = {
"indices": {
"ligand": data_lw.metadata["indices"]["ligand"],
"protein": data_wp.metadata["indices"]["protein"],
**{
key: tuple(indices)
for key, indices in water_indices.items()
},
},
"parent_indices": {
"ligand": data_lw.metadata["parent_indices"]["ligand"],
"protein": data_wp.metadata["parent_indices"][
"protein"
],
"water": tuple(
set().union(
data_lw.metadata["parent_indices"]["protein"],
data_wp.metadata["parent_indices"]["ligand"],
*(
(
data_ww.metadata["parent_indices"][
"ligand"
],
data_ww.metadata["parent_indices"][
"protein"
],
)
if ifp_ww
else ()
),
)
),
},
"water_residues": tuple(
{
data_lw.protein,
data_ww.ligand,
data_ww.protein,
data_wp.ligand,
}
if ifp_ww
else (data_lw.protein,)
),
"order": 2 if ifp_ww else 1,
"ligand_role": data_lw.interaction,
"protein_role": ( # invert role
"HBDonor"
if data_wp.interaction == "HBAcceptor"
else "HBAcceptor"
),
**{
f"{key}{suffix}": data.metadata[key]
for suffix, data in ([
("_ligand_water", data_lw),
("_water_protein", data_wp),
*(("_water_water", data_ww) if ifp_ww else ()),
])
for key in ["distance", "DHA_angle"]
},
}

# store metadata
ifp = self.ifp.setdefault(frame, IFP())
ifp.setdefault(
(data_lw.ligand, data_wp.protein), {}
).setdefault("WaterBridge", []).append(metadata)

@staticmethod
def _filter_interactions(
ifp: IFP | None,
predicate: Callable[[InteractionData], bool],
default: InteractionData | None = None,
) -> Iterator[InteractionData]:
"""Filters interactions to those that satisfy the predicate. If ``ifp==None``,
simply yields the ``default`` value.
"""
if ifp is None:
yield default
else:
for data in ifp.interactions():
if predicate(data):
yield data

0 comments on commit 4e34243

Please sign in to comment.